From ccd5adc1d6273b92fd6c9a0d4817451a5acb566a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 6 Apr 2017 10:00:25 -0700 Subject: [PATCH 1/8] [SPARK-20652][sql] Store SQL UI data in the new app status store. This change replaces the SQLListener with a new implementation that saves the data to the same store used by the SparkContext's status store. For that, the types used by the old SQLListener had to be updated a bit so that they're more serialization-friendly. The interface for getting data from the store was abstracted into a new class, SQLAppStatusStore (following the convention used in core). Another change is the way that the SQL UI hooks up into the core UI or the SHS. The old "SparkHistoryListenerFactory" was replaced with a new "AppStatePlugin" that more explicitly differentiates between the two use cases: processing events, and showing the UI. Both live apps and the SHS use this new API (previously, it was restricted to the SHS). Note on the above: this causes a slight change of behavior for live apps; the SQL tab will only show up after the first execution is started. The metrics gathering code was re-worked a bit so that the types used are less memory hungry and more serialization-friendly. This reduces memory usage when using in-memory stores, and reduces load times when using disk stores. Tested with existing and added unit tests. Note one unit test was disabled because it depends on SPARK-20653, which isn't in yet. --- .../scala/org/apache/spark/SparkContext.scala | 15 +- .../deploy/history/FsHistoryProvider.scala | 12 +- .../spark/scheduler/SparkListener.scala | 12 - .../apache/spark/status/AppStatusPlugin.scala | 71 +++ .../apache/spark/status/AppStatusStore.scala | 8 +- ...park.scheduler.SparkHistoryListenerFactory | 1 - .../org.apache.spark.status.AppStatusPlugin | 1 + .../org/apache/spark/sql/SparkSession.scala | 5 - .../sql/execution/ui/AllExecutionsPage.scala | 86 ++-- .../sql/execution/ui/ExecutionPage.scala | 60 ++- .../execution/ui/SQLAppStatusListener.scala | 354 +++++++++++++++ .../sql/execution/ui/SQLAppStatusStore.scala | 174 ++++++++ .../spark/sql/execution/ui/SQLListener.scala | 403 +----------------- .../spark/sql/execution/ui/SQLTab.scala | 2 +- .../spark/sql/internal/SharedState.scala | 19 - .../execution/metric/SQLMetricsSuite.scala | 18 +- .../metric/SQLMetricsTestUtils.scala | 30 +- .../sql/execution/ui/SQLListenerSuite.scala | 331 +++++++------- .../spark/sql/test/SharedSQLContext.scala | 1 - 19 files changed, 894 insertions(+), 709 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala delete mode 100644 sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory create mode 100644 sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e5aaaf6c155eb..30a93940ef667 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -54,7 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusPlugin, AppStatusStore} import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -246,6 +246,8 @@ class SparkContext(config: SparkConf) extends Logging { */ def isStopped: Boolean = stopped.get() + private[spark] def statusStore: AppStatusStore = _statusStore + // An asynchronous listener bus for Spark events private[spark] def listenerBus: LiveListenerBus = _listenerBus @@ -459,9 +461,14 @@ class SparkContext(config: SparkConf) extends Logging { // For tests, do not enable the UI None } - // Bind the UI before starting the task scheduler to communicate - // the bound port to the cluster manager properly - _ui.foreach(_.bind()) + _ui.foreach { ui => + // Load any plugins that might want to modify the UI. + AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui)) + + // Bind the UI before starting the task scheduler to communicate + // the bound port to the cluster manager properly + ui.bind() + } _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f16dddea9f784..bcd5b64c6c86d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -41,7 +41,7 @@ import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ -import org.apache.spark.status.{AppStatusListener, AppStatusStore, AppStatusStoreMetadata, KVUtils} +import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI @@ -318,6 +318,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val listener = if (needReplay) { val _listener = new AppStatusListener(kvstore, conf, false) replayBus.addListener(_listener) + AppStatusPlugin.loadPlugins().foreach { plugin => + plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false) + } Some(_listener) } else { None @@ -335,11 +338,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } try { - val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], - Utils.getContextOrSparkClassLoader).asScala - listenerFactories.foreach { listenerFactory => - val listeners = listenerFactory.createListeners(conf, loadedUI.ui) - listeners.foreach(replayBus.addListener) + AppStatusPlugin.loadPlugins().foreach { plugin => + plugin.setupUI(loadedUI.ui) } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b76e560669d59..3b677ca9657db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -167,18 +167,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent -/** - * Interface for creating history listeners defined in other modules like SQL, which are used to - * rebuild the history UI. - */ -private[spark] trait SparkHistoryListenerFactory { - /** - * Create listeners used to rebuild the history UI. - */ - def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] -} - - /** * Interface for listening to events from the Spark scheduler. Most applications should probably * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class. diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala new file mode 100644 index 0000000000000..69ca02ec76293 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * An interface that defines plugins for collecting and storing application state. + * + * The plugin implementations are invoked for both live and replayed applications. For live + * applications, it's recommended that plugins defer creation of UI tabs until there's actual + * data to be shown. + */ +private[spark] trait AppStatusPlugin { + + /** + * Install listeners to collect data about the running application and populate the given + * store. + * + * @param conf The Spark configuration. + * @param store The KVStore where to keep application data. + * @param addListenerFn Function to register listeners with a bus. + * @param live Whether this is a live application (or an application being replayed by the + * HistoryServer). + */ + def setupListeners( + conf: SparkConf, + store: KVStore, + addListenerFn: SparkListener => Unit, + live: Boolean): Unit + + /** + * Install any needed extensions (tabs, pages, etc) to a Spark UI. The plugin can detect whether + * the app is live or replayed by looking at the UI's SparkContext field `sc`. + * + * @param ui The Spark UI instance for the application. + */ + def setupUI(ui: SparkUI): Unit + +} + +private[spark] object AppStatusPlugin { + + def loadPlugins(): Iterable[AppStatusPlugin] = { + ServiceLoader.load(classOf[AppStatusPlugin], Utils.getContextOrSparkClassLoader).asScala + } + +} diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 2927a3227cbef..21c9a4737391f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} /** * A wrapper around a KVStore that provides methods for accessing the API data stored within. */ -private[spark] class AppStatusStore(store: KVStore) { +private[spark] class AppStatusStore(val store: KVStore) { def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { val it = store.view(classOf[JobDataWrapper]).asScala.map(_.info) @@ -212,9 +212,11 @@ private[spark] object AppStatusStore { */ def createLiveStore(conf: SparkConf, bus: LiveListenerBus): AppStatusStore = { val store = new InMemoryStore() - val stateStore = new AppStatusStore(store) bus.addToStatusQueue(new AppStatusListener(store, conf, true)) - stateStore + AppStatusPlugin.loadPlugins().foreach { p => + p.setupListeners(conf, store, l => bus.addToStatusQueue(l), true) + } + new AppStatusStore(store) } } diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory deleted file mode 100644 index 507100be90967..0000000000000 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin new file mode 100644 index 0000000000000..ac6d7f6962f85 --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin @@ -0,0 +1 @@ +org.apache.spark.sql.execution.ui.SQLAppStatusPlugin diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d5ab53ad8fe29..61d85360d0ddc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation @@ -956,7 +955,6 @@ object SparkSession { sparkContext.addSparkListener(new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) - sqlListener.set(null) } }) } @@ -1025,9 +1023,6 @@ object SparkSession { */ def getDefaultSession: Option[SparkSession] = Option(defaultSession.get) - /** A global SQL listener used for the SQL UI. */ - private[sql] val sqlListener = new AtomicReference[SQLListener]() - //////////////////////////////////////////////////////////////////////////////////////// // Private methods from now on //////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index f9c69864a3361..7019d98e1619f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -24,34 +24,54 @@ import scala.xml.{Node, NodeSeq} import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.JobExecutionStatus import org.apache.spark.internal.Logging import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { - private val listener = parent.listener + private val sqlStore = parent.sqlStore override def render(request: HttpServletRequest): Seq[Node] = { val currentTime = System.currentTimeMillis() - val content = listener.synchronized { + val running = new mutable.ArrayBuffer[SQLExecutionUIData]() + val completed = new mutable.ArrayBuffer[SQLExecutionUIData]() + val failed = new mutable.ArrayBuffer[SQLExecutionUIData]() + + sqlStore.executionsList().foreach { e => + val isRunning = e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING } + val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } + if (isRunning) { + running += e + } else if (isFailed) { + failed += e + } else { + completed += e + } + } + + val content = { val _content = mutable.ListBuffer[Node]() - if (listener.getRunningExecutions.nonEmpty) { + + if (running.nonEmpty) { _content ++= new RunningExecutionTable( - parent, s"Running Queries (${listener.getRunningExecutions.size})", currentTime, - listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq + parent, s"Running Queries (${running.size})", currentTime, + running.sortBy(_.submissionTime).reverse).toNodeSeq } - if (listener.getCompletedExecutions.nonEmpty) { + + if (completed.nonEmpty) { _content ++= new CompletedExecutionTable( - parent, s"Completed Queries (${listener.getCompletedExecutions.size})", currentTime, - listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq + parent, s"Completed Queries (${completed.size})", currentTime, + completed.sortBy(_.submissionTime).reverse).toNodeSeq } - if (listener.getFailedExecutions.nonEmpty) { + + if (failed.nonEmpty) { _content ++= new FailedExecutionTable( - parent, s"Failed Queries (${listener.getFailedExecutions.size})", currentTime, - listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq + parent, s"Failed Queries (${failed.size})", currentTime, + failed.sortBy(_.submissionTime).reverse).toNodeSeq } _content } @@ -65,26 +85,26 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
- val metrics = listener.getExecutionMetrics(executionId) + val metrics = sqlStore.executionMetrics(executionId) + val graph = sqlStore.planGraph(executionId) summary ++ - planVisualization(metrics, executionUIData.physicalPlanGraph) ++ + planVisualization(metrics, graph) ++ physicalPlanDescription(executionUIData.physicalPlanDescription) }.getOrElse {
No information to display for Plan {executionId}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala new file mode 100644 index 0000000000000..a413b0ee75606 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( + conf: SparkConf, + kvstore: KVStore, + live: Boolean, + ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate statge of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { + val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionIdString == null) { + // This is not a job created by SQL + return + } + + val executionId = executionIdString.toLong + val jobId = event.jobId + val exec = getOrCreateExecution(executionId) + + // Record the accumulator IDs for the stages of this job, so that the code that keeps + // track of the metrics knows which accumulators to look at. + val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList + event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) + } + + exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) + exec.stages = event.stageIds + update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + if (!isSQLStage(event.stageInfo.stageId)) { + return + } + + // Reset the metrics tracking object for the new attempt. + stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId + } + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { + val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED + } + exec.jobs = exec.jobs + (event.jobId -> result) + exec.endEvents += 1 + update(exec) + } + } + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) + } + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + if (!isSQLStage(event.stageId)) { + return + } + + val info = event.taskInfo + // SPARK-20342. If processing events from a live application, use the task metrics info to + // work around a race in the DAGScheduler. The metrics info does not contain accumulator info + // when reading event logs in the SHS, so we have to rely on the accumulator in that case. + val accums = if (live && event.taskMetrics != null) { + event.taskMetrics.externalAccums.flatMap { a => + // This call may fail if the accumulator is gc'ed, so account for that. + try { + Some(a.toInfo(Some(a.value), None)) + } catch { + case _: IllegalAccessError => None + } + } + } else { + info.accumulables + } + updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums, + info.successful) + } + + def executionMetrics(executionId: Long): Map[Long, String] = synchronized { + liveExecutions.get(executionId).map { exec => + if (exec.metricsValues != null) { + exec.metricsValues + } else { + aggregateMetrics(exec) + } + }.getOrElse { + throw new NoSuchElementException(s"execution $executionId not found") + } + } + + private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized { + val metricIds = exec.metrics.map(_.accumulatorId).sorted + val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val metrics = exec.stages + .flatMap(stageMetrics.get) + .flatMap(_.taskMetrics.values().asScala) + .flatMap { metrics => metrics.ids.zip(metrics.values) } + + (metrics ++ exec.driverAccumUpdates.toSeq) + .filter { case (id, _) => metricIds.contains(id) } + .groupBy(_._1) + .map { case (id, values) => + id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) + } + } + + private def updateStageMetrics( + stageId: Int, + attemptId: Int, + taskId: Long, + accumUpdates: Seq[AccumulableInfo], + succeeded: Boolean): Unit = { + stageMetrics.get(stageId).foreach { metrics => + if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { + return + } + + val oldTaskMetrics = metrics.taskMetrics.get(taskId) + if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { + return + } + + val updates = accumUpdates + .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } + .sortBy(_.id) + + if (updates.isEmpty) { + return + } + + val ids = new Array[Long](updates.size) + val values = new Array[Long](updates.size) + updates.zipWithIndex.foreach { case (acc, idx) => + ids(idx) = acc.id + // In a live application, accumulators have Long values, but when reading from event + // logs, they have String values. For now, assume all accumulators are Long and covert + // accordingly. + values(idx) = acc.update.get match { + case s: String => s.toLong + case l: Long => l + case o => throw new IllegalArgumentException(s"Unexpected: $o") + } + } + + metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) + } + } + + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + // Install the SQL tab in a live app if it hasn't been initialized yet. + if (!uiInitialized) { + ui.foreach { _ui => + new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui) + } + uiInitialized = true + } + + val SparkListenerSQLExecutionStart(executionId, description, details, + physicalPlanDescription, sparkPlanInfo, time) = event + + def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + nodes.map { + case cluster: SparkPlanGraphCluster => + val storedCluster = new SparkPlanGraphClusterWrapper( + cluster.id, + cluster.name, + cluster.desc, + toStoredNodes(cluster.nodes), + cluster.metrics) + new SparkPlanGraphNodeWrapper(null, storedCluster) + + case node => + new SparkPlanGraphNodeWrapper(node, null) + } + } + + val planGraph = SparkPlanGraph(sparkPlanInfo) + val sqlPlanMetrics = planGraph.allNodes.flatMap { node => + node.metrics.map(metric => (metric.accumulatorId, metric)) + }.toMap.values.toList + + val graphToStore = new SparkPlanGraphWrapper( + executionId, + toStoredNodes(planGraph.nodes), + planGraph.edges) + kvstore.write(graphToStore) + + val exec = getOrCreateExecution(executionId) + exec.description = description + exec.details = details + exec.physicalPlanDescription = physicalPlanDescription + exec.metrics = sqlPlanMetrics + exec.submissionTime = time + update(exec) + } + + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + val SparkListenerSQLExecutionEnd(executionId, time) = event + liveExecutions.get(executionId).foreach { exec => + synchronized { + exec.metricsValues = aggregateMetrics(exec) + + // Remove stale LiveStageMetrics objects for stages that are not active anymore. + val activeStages = liveExecutions.values.flatMap { other => + if (other != exec) other.stages else Nil + }.toSet + stageMetrics.retain { case (id, _) => activeStages.contains(id) } + + exec.completionTime = Some(new Date(time)) + exec.endEvents += 1 + + update(exec) + } + } + } + + private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { + val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event + liveExecutions.get(executionId).foreach { exec => + exec.driverAccumUpdates = accumUpdates.toMap + update(exec) + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) + case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) + case _ => // Ignore + } + + private def getOrCreateExecution(executionId: Long): LiveExecutionData = { + liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId)) + } + + private def update(exec: LiveExecutionData): Unit = { + val now = System.nanoTime() + if (exec.endEvents >= exec.jobs.size + 1) { + liveExecutions.remove(exec.executionId) + exec.write(kvstore, now) + } else if (liveUpdatePeriodNs >= 0) { + if (now - exec.lastWriteTime > liveUpdatePeriodNs) { + exec.write(kvstore, now) + } + } + } + + private def isSQLStage(stageId: Int): Boolean = { + liveExecutions.values.exists { exec => + exec.stages.contains(stageId) + } + } + +} + +private class LiveExecutionData(val executionId: Long) extends LiveEntity { + + var description: String = null + var details: String = null + var physicalPlanDescription: String = null + var metrics = Seq[SQLPlanMetric]() + var submissionTime = -1L + var completionTime: Option[Date] = None + + var jobs = Map[Int, JobExecutionStatus]() + var stages = Seq[Int]() + var driverAccumUpdates = Map[Long, Long]() + + var metricsValues: Map[Long, String] = null + + // Just in case job end and execution end arrive out of order, keep track of how many + // end events arrived so that the listener can stop tracking the execution. + var endEvents = 0 + + override protected def doUpdate(): Any = { + new SQLExecutionUIData( + executionId, + description, + details, + physicalPlanDescription, + metrics, + submissionTime, + completionTime, + jobs, + stages, + driverAccumUpdates, + metricsValues) + } + +} + +private class LiveStageMetrics( + val stageId: Int, + var attemptId: Int, + val accumulatorIds: Array[Long], + val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics]) + +private[sql] class LiveTaskMetrics( + val ids: Array[Long], + val values: Array[Long], + val succeeded: Boolean) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala new file mode 100644 index 0000000000000..42077f75f6a96 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's + * no state kept in this class, so it's ok to have multiple instances of it in an application. + */ +private[sql] class SQLAppStatusStore( + store: KVStore, + listener: Option[SQLAppStatusListener] = None) { + + def executionsList(): Seq[SQLExecutionUIData] = { + store.view(classOf[SQLExecutionUIData]).asScala.toSeq + } + + def execution(executionId: Long): Option[SQLExecutionUIData] = { + try { + Some(store.read(classOf[SQLExecutionUIData], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def executionsCount(): Long = { + store.count(classOf[SQLExecutionUIData]) + } + + def executionMetrics(executionId: Long): Map[Long, String] = { + val exec = store.read(classOf[SQLExecutionUIData], executionId) + Option(exec.metricValues) + .orElse(listener.map(_.executionMetrics(executionId))) + .getOrElse(Map()) + } + + def planGraph(executionId: Long): SparkPlanGraph = { + store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() + } + +} + +/** + * An AppStatusPlugin for handling the SQL UI and listeners. + */ +private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { + + override def setupListeners( + conf: SparkConf, + store: KVStore, + addListenerFn: SparkListener => Unit, + live: Boolean): Unit = { + // For live applications, the listener is installed in [[setupUI]]. This also avoids adding + // the listener when the UI is disabled. Force installation during testing, though. + if (!live || Utils.isTesting) { + val listener = new SQLAppStatusListener(conf, store, live, None) + addListenerFn(listener) + } + } + + override def setupUI(ui: SparkUI): Unit = { + ui.sc match { + case Some(sc) => + // If this is a live application, then install a listener that will enable the SQL + // tab as soon as there's a SQL event posted to the bus. + val listener = new SQLAppStatusListener(sc.conf, ui.store.store, true, Some(ui)) + sc.listenerBus.addToStatusQueue(listener) + + case _ => + // For a replayed application, only add the tab if the store already contains SQL data. + val sqlStore = new SQLAppStatusStore(ui.store.store) + if (sqlStore.executionsCount() > 0) { + new SQLTab(sqlStore, ui) + } + } + } + +} + +private[sql] class SQLExecutionUIData( + @KVIndexParam val executionId: Long, + val description: String, + val details: String, + val physicalPlanDescription: String, + val metrics: Seq[SQLPlanMetric], + val submissionTime: Long, + val completionTime: Option[Date], + @JsonDeserialize(keyAs = classOf[Integer]) + val jobs: Map[Int, JobExecutionStatus], + @JsonDeserialize(contentAs = classOf[Integer]) + val stages: Seq[Int], + @JsonDeserialize(keyAs = classOf[JLong], contentAs = classOf[JLong]) + val driverAccumUpdates: Map[Long, Long], + /** + * This field is only populated after the execution is finished; it will be null while the + * execution is still running. During execution, aggregate metrics need to be calculated from + * the individal SQLStageMetrics objects and driver updates. + */ + @JsonDeserialize(keyAs = classOf[JLong]) + val metricValues: Map[Long, String] + ) + +private[sql] class SparkPlanGraphWrapper( + @KVIndexParam val executionId: Long, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val edges: Seq[SparkPlanGraphEdge]) { + + def toSparkPlanGraph(): SparkPlanGraph = { + SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges) + } + +} + +private[sql] class SparkPlanGraphClusterWrapper( + val id: Long, + val name: String, + val desc: String, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val metrics: Seq[SQLPlanMetric]) { + + def toSparkPlanGraphCluster(): SparkPlanGraphCluster = { + new SparkPlanGraphCluster(id, name, desc, + new ArrayBuffer() ++ nodes.map(_.toSparkPlanGraphNode()), + metrics) + } + +} + +/** Only one of the values should be set. */ +private[sql] class SparkPlanGraphNodeWrapper( + val node: SparkPlanGraphNode, + val cluster: SparkPlanGraphClusterWrapper) { + + def toSparkPlanGraphNode(): SparkPlanGraphNode = { + assert(node == null ^ cluster == null, "One and only of of nore or cluster must be set.") + if (node != null) node else cluster.toSparkPlanGraphCluster() + } + +} + +private[sql] case class SQLPlanMetric( + name: String, + accumulatorId: Long, + metricType: String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 8c27af374febd..b58b8c6d45e5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -17,21 +17,15 @@ package org.apache.spark.sql.execution.ui -import scala.collection.mutable - import com.fasterxml.jackson.databind.JavaType import com.fasterxml.jackson.databind.`type`.TypeFactory import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.util.Converter -import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ -import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.metric._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.AccumulatorContext @DeveloperApi case class SparkListenerSQLExecutionStart( @@ -89,398 +83,3 @@ private class LongLongTupleConverter extends Converter[(Object, Object), (Long, typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType)) } } - -class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { - - override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { - List(new SQLHistoryListener(conf, sparkUI)) - } -} - -class SQLListener(conf: SparkConf) extends SparkListener with Logging { - - private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) - - private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() - - // Old data in the following fields must be removed in "trimExecutionsIfNecessary". - // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up old data - private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]() - - /** - * Maintain the relation between job id and execution id so that we can get the execution id in - * the "onJobEnd" method. - */ - private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() - - private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized { - _executionIdToData.toMap - } - - def jobIdToExecutionId: Map[Long, Long] = synchronized { - _jobIdToExecutionId.toMap - } - - def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized { - _stageIdToStageMetrics.toMap - } - - private def trimExecutionsIfNecessary( - executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = { - if (executions.size > retainedExecutions) { - val toRemove = math.max(retainedExecutions / 10, 1) - executions.take(toRemove).foreach { execution => - for (executionUIData <- _executionIdToData.remove(execution.executionId)) { - for (jobId <- executionUIData.jobs.keys) { - _jobIdToExecutionId.remove(jobId) - } - for (stageId <- executionUIData.stages) { - _stageIdToStageMetrics.remove(stageId) - } - } - } - executions.trimStart(toRemove) - } - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) - if (executionIdString == null) { - // This is not a job created by SQL - return - } - val executionId = executionIdString.toLong - val jobId = jobStart.jobId - val stageIds = jobStart.stageIds - - synchronized { - activeExecutions.get(executionId).foreach { executionUIData => - executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING - executionUIData.stages ++= stageIds - stageIds.foreach(stageId => - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId = 0)) - _jobIdToExecutionId(jobId) = executionId - } - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { - val jobId = jobEnd.jobId - for (executionId <- _jobIdToExecutionId.get(jobId); - executionUIData <- _executionIdToData.get(executionId)) { - jobEnd.jobResult match { - case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED - case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED - } - if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) { - // We are the last job of this execution, so mark the execution as finished. Note that - // `onExecutionEnd` also does this, but currently that can be called before `onJobEnd` - // since these are called on different threads. - markExecutionFinished(executionId) - } - } - } - - override def onExecutorMetricsUpdate( - executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { - for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) { - updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates, finishTask = false) - } - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { - val stageId = stageSubmitted.stageInfo.stageId - val stageAttemptId = stageSubmitted.stageInfo.attemptId - // Always override metrics for old stage attempt - if (_stageIdToStageMetrics.contains(stageId)) { - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) - } else { - // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". - // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. - // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126). - } - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - if (taskEnd.taskMetrics != null) { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)), - finishTask = true) - } - } - - /** - * Update the accumulator values of a task with the latest metrics for this task. This is called - * every time we receive an executor heartbeat or when a task finishes. - */ - protected def updateTaskAccumulatorValues( - taskId: Long, - stageId: Int, - stageAttemptID: Int, - _accumulatorUpdates: Seq[AccumulableInfo], - finishTask: Boolean): Unit = { - val accumulatorUpdates = - _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get)) - - _stageIdToStageMetrics.get(stageId) match { - case Some(stageMetrics) => - if (stageAttemptID < stageMetrics.stageAttemptId) { - // A task of an old stage attempt. Because a new stage is submitted, we can ignore it. - } else if (stageAttemptID > stageMetrics.stageAttemptId) { - logWarning(s"A task should not have a higher stageAttemptID ($stageAttemptID) then " + - s"what we have seen (${stageMetrics.stageAttemptId})") - } else { - // TODO We don't know the attemptId. Currently, what we can do is overriding the - // accumulator updates. However, if there are two same task are running, such as - // speculation, the accumulator updates will be overriding by different task attempts, - // the results will be weird. - stageMetrics.taskIdToMetricUpdates.get(taskId) match { - case Some(taskMetrics) => - if (finishTask) { - taskMetrics.finished = true - taskMetrics.accumulatorUpdates = accumulatorUpdates - } else if (!taskMetrics.finished) { - taskMetrics.accumulatorUpdates = accumulatorUpdates - } else { - // If a task is finished, we should not override with accumulator updates from - // heartbeat reports - } - case None => - stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics( - finished = finishTask, accumulatorUpdates) - } - } - case None => - // This execution and its stage have been dropped - } - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) => - val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => - node.metrics.map(metric => metric.accumulatorId -> metric) - } - val executionUIData = new SQLExecutionUIData( - executionId, - description, - details, - physicalPlanDescription, - physicalPlanGraph, - sqlPlanMetrics.toMap, - time) - synchronized { - activeExecutions(executionId) = executionUIData - _executionIdToData(executionId) = executionUIData - } - case SparkListenerSQLExecutionEnd(executionId, time) => synchronized { - _executionIdToData.get(executionId).foreach { executionUIData => - executionUIData.completionTime = Some(time) - if (!executionUIData.hasRunningJobs) { - // onExecutionEnd happens after all "onJobEnd"s - // So we should update the execution lists. - markExecutionFinished(executionId) - } else { - // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s. - // Then we don't if the execution is successful, so let the last onJobEnd updates the - // execution lists. - } - } - } - case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => synchronized { - _executionIdToData.get(executionId).foreach { executionUIData => - for ((accId, accValue) <- accumUpdates) { - executionUIData.driverAccumUpdates(accId) = accValue - } - } - } - case _ => // Ignore - } - - private def markExecutionFinished(executionId: Long): Unit = { - activeExecutions.remove(executionId).foreach { executionUIData => - if (executionUIData.isFailed) { - failedExecutions += executionUIData - trimExecutionsIfNecessary(failedExecutions) - } else { - completedExecutions += executionUIData - trimExecutionsIfNecessary(completedExecutions) - } - } - } - - def getRunningExecutions: Seq[SQLExecutionUIData] = synchronized { - activeExecutions.values.toSeq - } - - def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized { - failedExecutions - } - - def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized { - completedExecutions - } - - def getExecution(executionId: Long): Option[SQLExecutionUIData] = synchronized { - _executionIdToData.get(executionId) - } - - /** - * Get all accumulator updates from all tasks which belong to this execution and merge them. - */ - def getExecutionMetrics(executionId: Long): Map[Long, String] = synchronized { - _executionIdToData.get(executionId) match { - case Some(executionUIData) => - val accumulatorUpdates = { - for (stageId <- executionUIData.stages; - stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable; - taskMetrics <- stageMetrics.taskIdToMetricUpdates.values; - accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { - (accumulatorUpdate._1, accumulatorUpdate._2) - } - } - - val driverUpdates = executionUIData.driverAccumUpdates.toSeq - val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter { - case (id, _) => executionUIData.accumulatorMetrics.contains(id) - } - mergeAccumulatorUpdates(totalUpdates, accumulatorId => - executionUIData.accumulatorMetrics(accumulatorId).metricType) - case None => - // This execution has been dropped - Map.empty - } - } - - private def mergeAccumulatorUpdates( - accumulatorUpdates: Seq[(Long, Any)], - metricTypeFunc: Long => String): Map[Long, String] = { - accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => - val metricType = metricTypeFunc(accumulatorId) - accumulatorId -> - SQLMetrics.stringValue(metricType, values.map(_._2.asInstanceOf[Long])) - } - } - -} - - -/** - * A [[SQLListener]] for rendering the SQL UI in the history server. - */ -class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) - extends SQLListener(conf) { - - private var sqlTabAttached = false - - override def onExecutorMetricsUpdate(u: SparkListenerExecutorMetricsUpdate): Unit = { - // Do nothing; these events are not logged - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskInfo.accumulables.flatMap { a => - // Filter out accumulators that are not SQL metrics - // For now we assume all SQL metrics are Long's that have been JSON serialized as String's - if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) { - val newValue = a.update.map(_.toString.toLong).getOrElse(0L) - Some(a.copy(update = Some(newValue))) - } else { - None - } - }, - finishTask = true) - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case _: SparkListenerSQLExecutionStart => - if (!sqlTabAttached) { - new SQLTab(this, sparkUI) - sqlTabAttached = true - } - super.onOtherEvent(event) - case _ => super.onOtherEvent(event) - } -} - -/** - * Represent all necessary data for an execution that will be used in Web UI. - */ -private[ui] class SQLExecutionUIData( - val executionId: Long, - val description: String, - val details: String, - val physicalPlanDescription: String, - val physicalPlanGraph: SparkPlanGraph, - val accumulatorMetrics: Map[Long, SQLPlanMetric], - val submissionTime: Long) { - - var completionTime: Option[Long] = None - - val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty - - val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer() - - val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty - - /** - * Return whether there are running jobs in this execution. - */ - def hasRunningJobs: Boolean = jobs.values.exists(_ == JobExecutionStatus.RUNNING) - - /** - * Return whether there are any failed jobs in this execution. - */ - def isFailed: Boolean = jobs.values.exists(_ == JobExecutionStatus.FAILED) - - def runningJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.RUNNING }.keys.toSeq - - def succeededJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.SUCCEEDED }.keys.toSeq - - def failedJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.FAILED }.keys.toSeq -} - -/** - * Represent a metric in a SQLPlan. - * - * Because we cannot revert our changes for an "Accumulator", we need to maintain accumulator - * updates for each task. So that if a task is retried, we can simply override the old updates with - * the new updates of the new attempt task. Since we cannot add them to accumulator, we need to use - * "AccumulatorParam" to get the aggregation value. - */ -private[ui] case class SQLPlanMetric( - name: String, - accumulatorId: Long, - metricType: String) - -/** - * Store all accumulatorUpdates for all tasks in a Spark stage. - */ -private[ui] class SQLStageMetrics( - val stageAttemptId: Long, - val taskIdToMetricUpdates: mutable.HashMap[Long, SQLTaskMetrics] = mutable.HashMap.empty) - - -// TODO Should add attemptId here when we can get it from SparkListenerExecutorMetricsUpdate -/** - * Store all accumulatorUpdates for a Spark task. - */ -private[ui] class SQLTaskMetrics( - var finished: Boolean, - var accumulatorUpdates: Seq[(Long, Any)]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index d0376af3e31ca..a321a22f10789 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.ui import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} -class SQLTab(val listener: SQLListener, sparkUI: SparkUI) +class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI) extends SparkUITab(sparkUI, "SQL") with Logging { val parent = sparkUI diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ad9db308b2627..3e479faed72ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -32,7 +32,6 @@ import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager -import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.{MutableURLClassLoader, Utils} @@ -83,11 +82,6 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val cacheManager: CacheManager = new CacheManager - /** - * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s. - */ - val listener: SQLListener = createListenerAndUI(sparkContext) - /** * A catalog that interacts with external systems. */ @@ -142,19 +136,6 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { val jarClassLoader = new NonClosableMutableURLClassLoader( org.apache.spark.util.Utils.getContextOrSparkClassLoader) - /** - * Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI. - */ - private def createListenerAndUI(sc: SparkContext): SQLListener = { - if (SparkSession.sqlListener.get() == null) { - val listener = new SQLListener(sc.conf) - if (SparkSession.sqlListener.compareAndSet(null, listener)) { - sc.listenerBus.addToStatusQueue(listener) - sc.ui.foreach(new SQLTab(listener, _)) - } - } - SparkSession.sqlListener.get() - } } object SharedState extends Logging { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 58a194b8af62b..d588af3e19dde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.ui.SQLAppStatusStore import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -32,6 +33,13 @@ import org.apache.spark.util.{AccumulatorContext, JsonProtocol} class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with SharedSQLContext { import testImplicits._ + private def statusStore: SQLAppStatusStore = { + new SQLAppStatusStore(sparkContext.statusStore.store) + } + + private def currentExecutionIds(): Set[Long] = { + statusStore.executionsList.map(_.executionId).toSet + } /** * Generates a `DataFrame` by filling randomly generated bytes for hash collision. @@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared withTempPath { file => // person creates a temporary view. get the DF before listing previous execution IDs val data = person.select('name) - sparkContext.listenerBus.waitUntilEmpty(10000) - val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + val previousExecutionIds = currentExecutionIds() // Assume the execution plan is // PhysicalRDD(nodeId = 0) data.write.format("json").save(file.getAbsolutePath) sparkContext.listenerBus.waitUntilEmpty(10000) - val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size === 1) val executionId = executionIds.head - val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs + val jobs = statusStore.execution(executionId).get.jobs // Use "<=" because there is a race condition that we may miss some jobs // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event. assert(jobs.size <= 1) - val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId) + val metricValues = statusStore.executionMetrics(executionId) // Because "save" will create a new DataFrame internally, we cannot get the real metric id. // However, we still can check the value. assert(metricValues.values.toSeq.exists(_ === "2")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 3966e98c1ce06..d89c4b14619fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -25,7 +25,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.execution.ui.SparkPlanGraph +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils @@ -34,6 +34,14 @@ trait SQLMetricsTestUtils extends SQLTestUtils { import testImplicits._ + private def statusStore: SQLAppStatusStore = { + new SQLAppStatusStore(sparkContext.statusStore.store) + } + + private def currentExecutionIds(): Set[Long] = { + statusStore.executionsList.map(_.executionId).toSet + } + /** * Get execution metrics for the SQL execution and verify metrics values. * @@ -41,24 +49,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils { * @param func the function can produce execution id after running. */ private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): Unit = { - val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + val previousExecutionIds = currentExecutionIds() // Run the given function to trigger query execution. func spark.sparkContext.listenerBus.waitUntilEmpty(10000) - val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size == 1) val executionId = executionIds.head - val executionData = spark.sharedState.listener.getExecution(executionId).get - val executedNode = executionData.physicalPlanGraph.nodes.head + val executionData = statusStore.execution(executionId).get + val executedNode = statusStore.planGraph(executionId).nodes.head val metricsNames = Seq( "number of written files", "number of dynamic part", "number of output rows") - val metrics = spark.sharedState.listener.getExecutionMetrics(executionId) + val metrics = statusStore.executionMetrics(executionId) metricsNames.zip(metricsValues).foreach { case (metricsName, expected) => val sqlMetric = executedNode.metrics.find(_.name == metricsName) @@ -134,22 +141,21 @@ trait SQLMetricsTestUtils extends SQLTestUtils { expectedNumOfJobs: Int, expectedNodeIds: Set[Long], enableWholeStage: Boolean = false): Option[Map[Long, (String, Map[String, Any])]] = { - val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + val previousExecutionIds = currentExecutionIds() withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) { df.collect() } sparkContext.listenerBus.waitUntilEmpty(10000) - val executionIds = - spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size === 1) val executionId = executionIds.head - val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs + val jobs = statusStore.execution(executionId).get.jobs // Use "<=" because there is a race condition that we may miss some jobs // TODO Change it to "=" once we fix the race condition that missing the JobStarted event. assert(jobs.size <= expectedNumOfJobs) if (jobs.size == expectedNumOfJobs) { // If we can track all jobs, check the metric values - val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId) + val metricValues = statusStore.executionMetrics(executionId) val metrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( df.queryExecution.executedPlan)).allNodes.filter { node => expectedNodeIds.contains(node.id) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 1055f09f5411c..fc43d077b863c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.ui import java.util.Properties +import scala.collection.mutable.ListBuffer + import org.json4s.jackson.JsonMethods._ -import org.mockito.Mockito.mock import org.apache.spark._ import org.apache.spark.LocalSparkContext._ -import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config import org.apache.spark.rdd.RDD import org.apache.spark.scheduler._ @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - import org.apache.spark.AccumulatorSuite.makeInfo + + override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) private def createTestDataFrame: DataFrame = { Seq( @@ -68,44 +69,67 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest details = "" ) - private def createTaskInfo(taskId: Int, attemptNumber: Int): TaskInfo = new TaskInfo( - taskId = taskId, - attemptNumber = attemptNumber, - // The following fields are not used in tests - index = 0, - launchTime = 0, - executorId = "", - host = "", - taskLocality = null, - speculative = false - ) + private def createTaskInfo( + taskId: Int, + attemptNumber: Int, + accums: Map[Long, Long] = Map()): TaskInfo = { + val info = new TaskInfo( + taskId = taskId, + attemptNumber = attemptNumber, + // The following fields are not used in tests + index = 0, + launchTime = 0, + executorId = "", + host = "", + taskLocality = null, + speculative = false) + info.markFinished(TaskState.FINISHED, 1L) + info.setAccumulables(createAccumulatorInfos(accums)) + info + } - private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = { - val metrics = TaskMetrics.empty - accumulatorUpdates.foreach { case (id, update) => + private def createAccumulatorInfos(accumulatorUpdates: Map[Long, Long]): Seq[AccumulableInfo] = { + accumulatorUpdates.map { case (id, value) => val acc = new LongAccumulator - acc.metadata = AccumulatorMetadata(id, Some(""), true) - acc.add(update) - metrics.registerAccumulator(acc) + acc.metadata = AccumulatorMetadata(id, None, false) + acc.toInfo(Some(value), None) + }.toSeq + } + + /** Return the shared SQL store from the active SparkSession. */ + private def statusStore: SQLAppStatusStore = + new SQLAppStatusStore(spark.sparkContext.statusStore.store) + + /** + * Runs a test with a temporary SQLAppStatusStore tied to a listener bus. Events can be sent to + * the listener bus to update the store, and all data will be cleaned up at the end of the test. + */ + private def sqlStoreTest(name: String) + (fn: (SQLAppStatusStore, SparkListenerBus) => Unit): Unit = { + test(name) { + val store = new InMemoryStore() + val bus = new ReplayListenerBus() + val listener = new SQLAppStatusListener(sparkConf, store, true) + bus.addListener(listener) + val sqlStore = new SQLAppStatusStore(store, Some(listener)) + fn(sqlStore, bus) } - metrics } - test("basic") { + sqlStoreTest("basic") { (store, bus) => def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = { assert(actual.size == expected.size) - expected.foreach { e => + expected.foreach { case (id, value) => // The values in actual can be SQL metrics meaning that they contain additional formatting // when converted to string. Verify that they start with the expected value. // TODO: this is brittle. There is no requirement that the actual string needs to start // with the accumulator value. - assert(actual.contains(e._1)) - val v = actual.get(e._1).get.trim - assert(v.startsWith(e._2.toString)) + assert(actual.contains(id)) + val v = actual.get(id).get.trim + assert(v.startsWith(value.toString), s"Wrong value for accumulator $id") } } - val listener = new SQLListener(spark.sparkContext.conf) val executionId = 0 val df = createTestDataFrame val accumulatorIds = @@ -118,7 +142,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap - listener.onOtherEvent(SparkListenerSQLExecutionStart( + bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", @@ -126,9 +150,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - val executionUIData = listener.executionIdToData(0) - - listener.onJobStart(SparkListenerJobStart( + bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( @@ -136,291 +158,261 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) + bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) - assert(listener.getExecutionMetrics(0).isEmpty) + assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. - listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + + bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, - createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) + bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt - listener.onTaskEnd(SparkListenerTaskEnd( + bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100)))) + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Finish two tasks - listener.onTaskEnd(SparkListenerTaskEnd( + bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)))) - listener.onTaskEnd(SparkListenerTaskEnd( + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), + null)) + bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(1, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 3)))) + createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + null)) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5)) // Summit a new stage - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) + bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7)) // Finish two tasks - listener.onTaskEnd(SparkListenerTaskEnd( + bus.postToAll(SparkListenerTaskEnd( stageId = 1, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 3)))) - listener.onTaskEnd(SparkListenerTaskEnd( + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + null)) + bus.postToAll(SparkListenerTaskEnd( stageId = 1, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(1, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 3)))) + createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + null)) - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) - assert(executionUIData.runningJobs === Seq(0)) - assert(executionUIData.succeededJobs.isEmpty) - assert(executionUIData.failedJobs.isEmpty) + assertJobs(store.execution(0), running = Seq(0)) - listener.onJobEnd(SparkListenerJobEnd( + bus.postToAll(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobSucceeded )) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( + bus.postToAll(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - assert(executionUIData.runningJobs.isEmpty) - assert(executionUIData.succeededJobs === Seq(0)) - assert(executionUIData.failedJobs.isEmpty) - - checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) + assertJobs(store.execution(0), completed = Seq(0)) + checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) } - test("onExecutionEnd happens before onJobEnd(JobSucceeded)") { - val listener = new SQLListener(spark.sparkContext.conf) + sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) => val executionId = 0 val df = createTestDataFrame - listener.onOtherEvent(SparkListenerSQLExecutionStart( + bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - listener.onJobStart(SparkListenerJobStart( + bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( + bus.postToAll(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - listener.onJobEnd(SparkListenerJobEnd( + bus.postToAll(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobSucceeded )) - val executionUIData = listener.executionIdToData(0) - assert(executionUIData.runningJobs.isEmpty) - assert(executionUIData.succeededJobs === Seq(0)) - assert(executionUIData.failedJobs.isEmpty) + assertJobs(store.execution(0), completed = Seq(0)) } - test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { - val listener = new SQLListener(spark.sparkContext.conf) + sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) => val executionId = 0 val df = createTestDataFrame - listener.onOtherEvent(SparkListenerSQLExecutionStart( + bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - listener.onJobStart(SparkListenerJobStart( + bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - listener.onJobEnd(SparkListenerJobEnd( + bus.postToAll(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobSucceeded )) - listener.onJobStart(SparkListenerJobStart( + bus.postToAll(SparkListenerJobStart( jobId = 1, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( + bus.postToAll(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - listener.onJobEnd(SparkListenerJobEnd( + bus.postToAll(SparkListenerJobEnd( jobId = 1, time = System.currentTimeMillis(), JobSucceeded )) - val executionUIData = listener.executionIdToData(0) - assert(executionUIData.runningJobs.isEmpty) - assert(executionUIData.succeededJobs.sorted === Seq(0, 1)) - assert(executionUIData.failedJobs.isEmpty) + assertJobs(store.execution(0), completed = Seq(0, 1)) } - test("onExecutionEnd happens before onJobEnd(JobFailed)") { - val listener = new SQLListener(spark.sparkContext.conf) + sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) => val executionId = 0 val df = createTestDataFrame - listener.onOtherEvent(SparkListenerSQLExecutionStart( + bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - listener.onJobStart(SparkListenerJobStart( + bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq.empty, createProperties(executionId))) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( + bus.postToAll(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - listener.onJobEnd(SparkListenerJobEnd( + bus.postToAll(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobFailed(new RuntimeException("Oops")) )) - val executionUIData = listener.executionIdToData(0) - assert(executionUIData.runningJobs.isEmpty) - assert(executionUIData.succeededJobs.isEmpty) - assert(executionUIData.failedJobs === Seq(0)) + assertJobs(store.execution(0), failed = Seq(0)) } test("SPARK-11126: no memory leak when running non SQL jobs") { - val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size + val previousStageNumber = statusStore.executionsList().size spark.sparkContext.parallelize(1 to 10).foreach(i => ()) spark.sparkContext.listenerBus.waitUntilEmpty(10000) // listener should ignore the non SQL stage - assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber) + assert(statusStore.executionsList().size == previousStageNumber) spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ()) spark.sparkContext.listenerBus.waitUntilEmpty(10000) // listener should save the SQL stage - assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1) - } - - test("SPARK-13055: history listener only tracks SQL metrics") { - val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI])) - // We need to post other events for the listener to track our accumulators. - // These are largely just boilerplate unrelated to what we're trying to test. - val df = createTestDataFrame - val executionStart = SparkListenerSQLExecutionStart( - 0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0) - val stageInfo = createStageInfo(0, 0) - val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0)) - val stageSubmitted = SparkListenerStageSubmitted(stageInfo) - // This task has both accumulators that are SQL metrics and accumulators that are not. - // The listener should only track the ones that are actually SQL metrics. - val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella") - val nonSqlMetric = sparkContext.longAccumulator("baseball") - val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None) - val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None) - val taskInfo = createTaskInfo(0, 0) - taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo)) - val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null) - listener.onOtherEvent(executionStart) - listener.onJobStart(jobStart) - listener.onStageSubmitted(stageSubmitted) - // Before SPARK-13055, this throws ClassCastException because the history listener would - // assume that the accumulator value is of type Long, but this may not be true for - // accumulators that are not SQL metrics. - listener.onTaskEnd(taskEnd) - val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics => - stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates) - } - // Listener tracks only SQL metrics, not other accumulators - assert(trackedAccums.size === 1) - assert(trackedAccums.head === ((sqlMetricInfo.id, sqlMetricInfo.update.get))) + assert(statusStore.executionsList().size == previousStageNumber + 1) } test("driver side SQL metrics") { - val listener = new SQLListener(spark.sparkContext.conf) - val expectedAccumValue = 12345 + val oldCount = statusStore.executionsList().size + val expectedAccumValue = 12345L val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue) - sqlContext.sparkContext.addSparkListener(listener) val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { override lazy val sparkPlan = physicalPlan override lazy val executedPlan = physicalPlan } + SQLExecution.withNewExecutionId(spark, dummyQueryExecution) { physicalPlan.execute().collect() } - def waitTillExecutionFinished(): Unit = { - while (listener.getCompletedExecutions.isEmpty) { - Thread.sleep(100) + while (statusStore.executionsList().size < oldCount) { + Thread.sleep(100) + } + + val driverUpdates = statusStore.executionsList().last.driverAccumUpdates + assert(driverUpdates.size === 1) + assert(driverUpdates.values.head === expectedAccumValue) + } + + private def assertJobs( + exec: Option[SQLExecutionUIData], + running: Seq[Int] = Nil, + completed: Seq[Int] = Nil, + failed: Seq[Int] = Nil): Unit = { + + val actualRunning = new ListBuffer[Int]() + val actualCompleted = new ListBuffer[Int]() + val actualFailed = new ListBuffer[Int]() + + exec.get.jobs.foreach { case (jobId, jobStatus) => + jobStatus match { + case JobExecutionStatus.RUNNING => actualRunning += jobId + case JobExecutionStatus.SUCCEEDED => actualCompleted += jobId + case JobExecutionStatus.FAILED => actualFailed += jobId + case _ => fail(s"Unexpected status $jobStatus") } } - waitTillExecutionFinished() - val driverUpdates = listener.getCompletedExecutions.head.driverAccumUpdates - assert(driverUpdates.size == 1) - assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue) + assert(actualRunning.toSeq.sorted === running) + assert(actualCompleted.toSeq.sorted === completed) + assert(actualFailed.toSeq.sorted === failed) } test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") { @@ -490,7 +482,8 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe class SQLListenerMemoryLeakSuite extends SparkFunSuite { - test("no memory leak") { + // TODO: this feature is not yet available in SQLAppStatusStore. + ignore("no memory leak") { quietly { val conf = new SparkConf() .setMaster("local") @@ -498,7 +491,6 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite { .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly withSpark(new SparkContext(conf)) { sc => - SparkSession.sqlListener.set(null) val spark = new SparkSession(sc) import spark.implicits._ // Run 100 successful executions and 100 failed executions. @@ -516,12 +508,9 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite { } } sc.listenerBus.waitUntilEmpty(10000) - assert(spark.sharedState.listener.getCompletedExecutions.size <= 50) - assert(spark.sharedState.listener.getFailedExecutions.size <= 50) - // 50 for successful executions and 50 for failed executions - assert(spark.sharedState.listener.executionIdToData.size <= 100) - assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100) - assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100) + + val statusStore = new SQLAppStatusStore(sc.statusStore.store) + assert(statusStore.executionsList().size <= 50) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index cd8d0708d8a32..9324585ec04d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -64,7 +64,6 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua * Initialize the [[TestSparkSession]]. */ protected override def beforeAll(): Unit = { - SparkSession.sqlListener.set(null) if (_spark == null) { _spark = createSparkSession } From c4af460bc995a5021a26edc5e9709d5d81f1c09b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 6 Nov 2017 17:05:41 -0800 Subject: [PATCH 2/8] Don't store driver accum updates in the status store. --- .../sql/execution/ui/SQLAppStatusListener.scala | 3 +-- .../sql/execution/ui/SQLAppStatusStore.scala | 6 ++---- .../spark/sql/execution/ui/SQLListenerSuite.scala | 15 ++++++++++++--- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index a413b0ee75606..1a964f4794a35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -229,7 +229,7 @@ private[sql] class SQLAppStatusListener( val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => - node.metrics.map(metric => (metric.accumulatorId, metric)) + node.metrics.map { metric => (metric.accumulatorId, metric) } }.toMap.values.toList val graphToStore = new SparkPlanGraphWrapper( @@ -336,7 +336,6 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { completionTime, jobs, stages, - driverAccumUpdates, metricsValues) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 42077f75f6a96..80e45c5512050 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -119,12 +119,10 @@ private[sql] class SQLExecutionUIData( val jobs: Map[Int, JobExecutionStatus], @JsonDeserialize(contentAs = classOf[Integer]) val stages: Seq[Int], - @JsonDeserialize(keyAs = classOf[JLong], contentAs = classOf[JLong]) - val driverAccumUpdates: Map[Long, Long], /** * This field is only populated after the execution is finished; it will be null while the - * execution is still running. During execution, aggregate metrics need to be calculated from - * the individal SQLStageMetrics objects and driver updates. + * execution is still running. During execution, aggregate metrics need to be retrieved + * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) val metricValues: Map[Long, String] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index fc43d077b863c..eba8d55daad58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -386,9 +386,18 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest Thread.sleep(100) } - val driverUpdates = statusStore.executionsList().last.driverAccumUpdates - assert(driverUpdates.size === 1) - assert(driverUpdates.values.head === expectedAccumValue) + // Wait for listener to finish computing the metrics for the execution. + while (statusStore.executionsList().last.metricValues == null) { + Thread.sleep(100) + } + + val execId = statusStore.executionsList().last.executionId + val metrics = statusStore.executionMetrics(execId) + val driverMetric = physicalPlan.metrics("dummy") + val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, Seq(expectedAccumValue)) + + assert(metrics.contains(driverMetric.id)) + assert(metrics(driverMetric.id) === expectedValue) } private def assertJobs( From 46695edece81057986e7a0b7f87a0eacbe0b0fca Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 7 Nov 2017 10:49:11 -0800 Subject: [PATCH 3/8] Typo. --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 1a964f4794a35..fce06dd158551 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -40,7 +40,7 @@ private[sql] class SQLAppStatusListener( ui: Option[SparkUI] = None) extends SparkListener with Logging { - // How often to flush intermediate statge of a live execution to the store. When replaying logs, + // How often to flush intermediate stage of a live execution to the store. When replaying logs, // never flush (only do the very last write). private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L From 197dd8fe645d3672c6e0c0ac0f52144a84b91dc5 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 7 Nov 2017 15:39:06 -0800 Subject: [PATCH 4/8] Use set to keep stage list. --- .../spark/sql/execution/ui/SQLAppStatusListener.scala | 6 +++--- .../apache/spark/sql/execution/ui/SQLAppStatusStore.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index fce06dd158551..520fe1d14bd8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -68,7 +68,7 @@ private[sql] class SQLAppStatusListener( } exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) - exec.stages = event.stageIds + exec.stages = event.stageIds.toSet update(exec) } @@ -144,7 +144,7 @@ private[sql] class SQLAppStatusListener( private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized { val metricIds = exec.metrics.map(_.accumulatorId).sorted val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap - val metrics = exec.stages + val metrics = exec.stages.toSeq .flatMap(stageMetrics.get) .flatMap(_.taskMetrics.values().asScala) .flatMap { metrics => metrics.ids.zip(metrics.values) } @@ -316,7 +316,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var completionTime: Option[Date] = None var jobs = Map[Int, JobExecutionStatus]() - var stages = Seq[Int]() + var stages = Set[Int]() var driverAccumUpdates = Map[Long, Long]() var metricsValues: Map[Long, String] = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 80e45c5512050..8b85c08d9ed17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -118,7 +118,7 @@ private[sql] class SQLExecutionUIData( @JsonDeserialize(keyAs = classOf[Integer]) val jobs: Map[Int, JobExecutionStatus], @JsonDeserialize(contentAs = classOf[Integer]) - val stages: Seq[Int], + val stages: Set[Int], /** * This field is only populated after the execution is finished; it will be null while the * execution is still running. During execution, aggregate metrics need to be retrieved From ecf293b31fa1b5250f484d6b2f09373e7057bbc3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 7 Nov 2017 21:44:22 -0800 Subject: [PATCH 5/8] Typo. --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 520fe1d14bd8e..46f3fc822f662 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -40,7 +40,7 @@ private[sql] class SQLAppStatusListener( ui: Option[SparkUI] = None) extends SparkListener with Logging { - // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // How often to flush intermediate state of a live execution to the store. When replaying logs, // never flush (only do the very last write). private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L From a42f418bc5a84260f8b02b6db4e03847c764f362 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 8 Nov 2017 13:10:22 -0800 Subject: [PATCH 6/8] Simplify and fix synchronization for live executions. --- .../execution/ui/SQLAppStatusListener.scala | 76 +++++++++++-------- .../sql/execution/ui/SQLAppStatusStore.scala | 5 +- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 46f3fc822f662..4343b5f37f90d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.execution.ui import java.util.Date import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric._ -import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.LiveEntity import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI @@ -44,8 +43,11 @@ private[sql] class SQLAppStatusListener( // never flush (only do the very last write). private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L - private val liveExecutions = new HashMap[Long, LiveExecutionData]() - private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + // Live tracked data is needed by the SQL status store to calculate metrics for in-flight + // executions; that means arbitrary threads may be querying these maps, so they need to be + // thread-safe. + private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() private var uiInitialized = false @@ -78,14 +80,14 @@ private[sql] class SQLAppStatusListener( } // Reset the metrics tracking object for the new attempt. - stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics => metrics.taskMetrics.clear() metrics.attemptId = event.stageInfo.attemptId } } override def onJobEnd(event: SparkListenerJobEnd): Unit = { - liveExecutions.values.foreach { exec => + liveExecutions.values().asScala.foreach { exec => if (exec.jobs.contains(event.jobId)) { val result = event.jobResult match { case JobSucceeded => JobExecutionStatus.SUCCEEDED @@ -129,8 +131,8 @@ private[sql] class SQLAppStatusListener( info.successful) } - def executionMetrics(executionId: Long): Map[Long, String] = synchronized { - liveExecutions.get(executionId).map { exec => + def executionMetrics(executionId: Long): Map[Long, String] = { + Option(liveExecutions.get(executionId)).map { exec => if (exec.metricsValues != null) { exec.metricsValues } else { @@ -141,20 +143,30 @@ private[sql] class SQLAppStatusListener( } } - private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized { + private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { val metricIds = exec.metrics.map(_.accumulatorId).sorted val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap val metrics = exec.stages.toSeq - .flatMap(stageMetrics.get) + .flatMap { stageId => Option(stageMetrics.get(stageId)) } .flatMap(_.taskMetrics.values().asScala) .flatMap { metrics => metrics.ids.zip(metrics.values) } - (metrics ++ exec.driverAccumUpdates.toSeq) + val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq) .filter { case (id, _) => metricIds.contains(id) } .groupBy(_._1) .map { case (id, values) => id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) } + + // Check the execution again for whether the aggregated metrics data has been calculated. + // This can happen if the UI is requesting this data, and the onExecutionEnd handler is + // running at the same time. The metrics calculcated for the UI can be innacurate in that + // case, since the onExecutionEnd handler will clean up tracked stage metrics. + if (exec.metricsValues != null) { + exec.metricsValues + } else { + aggregatedMetrics + } } private def updateStageMetrics( @@ -163,7 +175,7 @@ private[sql] class SQLAppStatusListener( taskId: Long, accumUpdates: Seq[AccumulableInfo], succeeded: Boolean): Unit = { - stageMetrics.get(stageId).foreach { metrics => + Option(stageMetrics.get(stageId)).foreach { metrics => if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { return } @@ -195,6 +207,7 @@ private[sql] class SQLAppStatusListener( } } + // TODO: storing metrics by task ID can lead to innacurate metrics when speculation is on. metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) } } @@ -249,27 +262,25 @@ private[sql] class SQLAppStatusListener( private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { val SparkListenerSQLExecutionEnd(executionId, time) = event - liveExecutions.get(executionId).foreach { exec => - synchronized { - exec.metricsValues = aggregateMetrics(exec) - - // Remove stale LiveStageMetrics objects for stages that are not active anymore. - val activeStages = liveExecutions.values.flatMap { other => - if (other != exec) other.stages else Nil - }.toSet - stageMetrics.retain { case (id, _) => activeStages.contains(id) } - - exec.completionTime = Some(new Date(time)) - exec.endEvents += 1 + Option(liveExecutions.get(executionId)).foreach { exec => + exec.metricsValues = aggregateMetrics(exec) + exec.completionTime = Some(new Date(time)) + exec.endEvents += 1 + update(exec) - update(exec) - } + // Remove stale LiveStageMetrics objects for stages that are not active anymore. + val activeStages = liveExecutions.values().asScala.flatMap { other => + if (other != exec) other.stages else Nil + }.toSet + stageMetrics.keySet().asScala + .filter(!activeStages.contains(_)) + .foreach(stageMetrics.remove) } } private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event - liveExecutions.get(executionId).foreach { exec => + Option(liveExecutions.get(executionId)).foreach { exec => exec.driverAccumUpdates = accumUpdates.toMap update(exec) } @@ -283,14 +294,17 @@ private[sql] class SQLAppStatusListener( } private def getOrCreateExecution(executionId: Long): LiveExecutionData = { - liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId)) + liveExecutions.computeIfAbsent(executionId, + new Function[Long, LiveExecutionData]() { + override def apply(key: Long): LiveExecutionData = new LiveExecutionData(executionId) + }) } private def update(exec: LiveExecutionData): Unit = { val now = System.nanoTime() if (exec.endEvents >= exec.jobs.size + 1) { - liveExecutions.remove(exec.executionId) exec.write(kvstore, now) + liveExecutions.remove(exec.executionId) } else if (liveUpdatePeriodNs >= 0) { if (now - exec.lastWriteTime > liveUpdatePeriodNs) { exec.write(kvstore, now) @@ -299,7 +313,7 @@ private[sql] class SQLAppStatusListener( } private def isSQLStage(stageId: Int): Boolean = { - liveExecutions.values.exists { exec => + liveExecutions.values().asScala.exists { exec => exec.stages.contains(stageId) } } @@ -319,7 +333,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var stages = Set[Int]() var driverAccumUpdates = Map[Long, Long]() - var metricsValues: Map[Long, String] = null + @volatile var metricsValues: Map[Long, String] = null // Just in case job end and execution end arrive out of order, keep track of how many // end events arrived so that the listener can stop tracking the execution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 8b85c08d9ed17..b794bb241d8b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.{JobExecutionStatus, SparkConf} -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.AppStatusPlugin import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.ui.SparkUI @@ -59,6 +59,9 @@ private[sql] class SQLAppStatusStore( def executionMetrics(executionId: Long): Map[Long, String] = { val exec = store.read(classOf[SQLExecutionUIData], executionId) + + // Need to try to read from the underlying store twice, in case the live execution data is + // removed from the listener while this method is running. Option(exec.metricValues) .orElse(listener.map(_.executionMetrics(executionId))) .getOrElse(Map()) From bb7388b86d7adf8bbf209cf7748c319c4b8c0c77 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 8 Nov 2017 18:35:19 -0800 Subject: [PATCH 7/8] Remove stale comment. --- .../org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index b794bb241d8b8..fb7608b44eec0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -59,9 +59,6 @@ private[sql] class SQLAppStatusStore( def executionMetrics(executionId: Long): Map[Long, String] = { val exec = store.read(classOf[SQLExecutionUIData], executionId) - - // Need to try to read from the underlying store twice, in case the live execution data is - // removed from the listener while this method is running. Option(exec.metricValues) .orElse(listener.map(_.executionMetrics(executionId))) .getOrElse(Map()) From 1a31665ab6d3352dee3e15c87a697a7e655eb34c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 9 Nov 2017 15:19:30 -0800 Subject: [PATCH 8/8] Fix a race. --- .../sql/execution/ui/SQLAppStatusListener.scala | 7 +++---- .../spark/sql/execution/ui/SQLAppStatusStore.scala | 13 ++++++++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 4343b5f37f90d..43cec4807ae4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -131,15 +131,13 @@ private[sql] class SQLAppStatusListener( info.successful) } - def executionMetrics(executionId: Long): Map[Long, String] = { + def liveExecutionMetrics(executionId: Long): Option[Map[Long, String]] = { Option(liveExecutions.get(executionId)).map { exec => if (exec.metricsValues != null) { exec.metricsValues } else { aggregateMetrics(exec) } - }.getOrElse { - throw new NoSuchElementException(s"execution $executionId not found") } } @@ -207,7 +205,8 @@ private[sql] class SQLAppStatusListener( } } - // TODO: storing metrics by task ID can lead to innacurate metrics when speculation is on. + // TODO: storing metrics by task ID can cause metrics for the same task index to be + // counted multiple times, for example due to speculation or re-attempts. metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index fb7608b44eec0..586d3ae411c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -58,9 +58,16 @@ private[sql] class SQLAppStatusStore( } def executionMetrics(executionId: Long): Map[Long, String] = { - val exec = store.read(classOf[SQLExecutionUIData], executionId) - Option(exec.metricValues) - .orElse(listener.map(_.executionMetrics(executionId))) + def metricsFromStore(): Option[Map[Long, String]] = { + val exec = store.read(classOf[SQLExecutionUIData], executionId) + Option(exec.metricValues) + } + + metricsFromStore() + .orElse(listener.flatMap(_.liveExecutionMetrics(executionId))) + // Try a second time in case the execution finished while this method is trying to + // get the metrics. + .orElse(metricsFromStore()) .getOrElse(Map()) }