Skip to content

Commit 0ffa7c4

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-20652][SQL] Store SQL UI data in the new app status store.
This change replaces the SQLListener with a new implementation that saves the data to the same store used by the SparkContext's status store. For that, the types used by the old SQLListener had to be updated a bit so that they're more serialization-friendly. The interface for getting data from the store was abstracted into a new class, SQLAppStatusStore (following the convention used in core). Another change is the way that the SQL UI hooks up into the core UI or the SHS. The old "SparkHistoryListenerFactory" was replaced with a new "AppStatePlugin" that more explicitly differentiates between the two use cases: processing events, and showing the UI. Both live apps and the SHS use this new API (previously, it was restricted to the SHS). Note on the above: this causes a slight change of behavior for live apps; the SQL tab will only show up after the first execution is started. The metrics gathering code was re-worked a bit so that the types used are less memory hungry and more serialization-friendly. This reduces memory usage when using in-memory stores, and reduces load times when using disk stores. Tested with existing and added unit tests. Note one unit test was disabled because it depends on SPARK-20653, which isn't in yet. Author: Marcelo Vanzin <[email protected]> Closes #19681 from vanzin/SPARK-20652.
1 parent 4741c07 commit 0ffa7c4

File tree

19 files changed

+920
-709
lines changed

19 files changed

+920
-709
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef
5454
import org.apache.spark.scheduler._
5555
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
5656
import org.apache.spark.scheduler.local.LocalSchedulerBackend
57-
import org.apache.spark.status.AppStatusStore
57+
import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
5858
import org.apache.spark.storage._
5959
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
6060
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
@@ -246,6 +246,8 @@ class SparkContext(config: SparkConf) extends Logging {
246246
*/
247247
def isStopped: Boolean = stopped.get()
248248

249+
private[spark] def statusStore: AppStatusStore = _statusStore
250+
249251
// An asynchronous listener bus for Spark events
250252
private[spark] def listenerBus: LiveListenerBus = _listenerBus
251253

@@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
455457
// For tests, do not enable the UI
456458
None
457459
}
458-
// Bind the UI before starting the task scheduler to communicate
459-
// the bound port to the cluster manager properly
460-
_ui.foreach(_.bind())
460+
_ui.foreach { ui =>
461+
// Load any plugins that might want to modify the UI.
462+
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
463+
464+
// Bind the UI before starting the task scheduler to communicate
465+
// the bound port to the cluster manager properly
466+
ui.bind()
467+
}
461468

462469
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
463470

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.deploy.history.config._
4141
import org.apache.spark.internal.Logging
4242
import org.apache.spark.scheduler._
4343
import org.apache.spark.scheduler.ReplayListenerBus._
44-
import org.apache.spark.status.{AppStatusListener, AppStatusStore, AppStatusStoreMetadata, KVUtils}
44+
import org.apache.spark.status._
4545
import org.apache.spark.status.KVUtils._
4646
import org.apache.spark.status.api.v1
4747
import org.apache.spark.ui.SparkUI
@@ -319,6 +319,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
319319
val _listener = new AppStatusListener(kvstore, conf, false,
320320
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
321321
replayBus.addListener(_listener)
322+
AppStatusPlugin.loadPlugins().foreach { plugin =>
323+
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
324+
}
322325
Some(_listener)
323326
} else {
324327
None
@@ -333,11 +336,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
333336
}
334337

335338
try {
336-
val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
337-
Utils.getContextOrSparkClassLoader).asScala
338-
listenerFactories.foreach { listenerFactory =>
339-
val listeners = listenerFactory.createListeners(conf, loadedUI.ui)
340-
listeners.foreach(replayBus.addListener)
339+
AppStatusPlugin.loadPlugins().foreach { plugin =>
340+
plugin.setupUI(loadedUI.ui)
341341
}
342342

343343
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
167167
@DeveloperApi
168168
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
169169

170-
/**
171-
* Interface for creating history listeners defined in other modules like SQL, which are used to
172-
* rebuild the history UI.
173-
*/
174-
private[spark] trait SparkHistoryListenerFactory {
175-
/**
176-
* Create listeners used to rebuild the history UI.
177-
*/
178-
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
179-
}
180-
181-
182170
/**
183171
* Interface for listening to events from the Spark scheduler. Most applications should probably
184172
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status
19+
20+
import java.util.ServiceLoader
21+
22+
import scala.collection.JavaConverters._
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.scheduler.SparkListener
26+
import org.apache.spark.ui.SparkUI
27+
import org.apache.spark.util.Utils
28+
import org.apache.spark.util.kvstore.KVStore
29+
30+
/**
31+
* An interface that defines plugins for collecting and storing application state.
32+
*
33+
* The plugin implementations are invoked for both live and replayed applications. For live
34+
* applications, it's recommended that plugins defer creation of UI tabs until there's actual
35+
* data to be shown.
36+
*/
37+
private[spark] trait AppStatusPlugin {
38+
39+
/**
40+
* Install listeners to collect data about the running application and populate the given
41+
* store.
42+
*
43+
* @param conf The Spark configuration.
44+
* @param store The KVStore where to keep application data.
45+
* @param addListenerFn Function to register listeners with a bus.
46+
* @param live Whether this is a live application (or an application being replayed by the
47+
* HistoryServer).
48+
*/
49+
def setupListeners(
50+
conf: SparkConf,
51+
store: KVStore,
52+
addListenerFn: SparkListener => Unit,
53+
live: Boolean): Unit
54+
55+
/**
56+
* Install any needed extensions (tabs, pages, etc) to a Spark UI. The plugin can detect whether
57+
* the app is live or replayed by looking at the UI's SparkContext field `sc`.
58+
*
59+
* @param ui The Spark UI instance for the application.
60+
*/
61+
def setupUI(ui: SparkUI): Unit
62+
63+
}
64+
65+
private[spark] object AppStatusPlugin {
66+
67+
def loadPlugins(): Iterable[AppStatusPlugin] = {
68+
ServiceLoader.load(classOf[AppStatusPlugin], Utils.getContextOrSparkClassLoader).asScala
69+
}
70+
71+
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
3232
/**
3333
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
3434
*/
35-
private[spark] class AppStatusStore(store: KVStore) {
35+
private[spark] class AppStatusStore(val store: KVStore) {
3636

3737
def applicationInfo(): v1.ApplicationInfo = {
3838
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
@@ -338,9 +338,11 @@ private[spark] object AppStatusStore {
338338
*/
339339
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
340340
val store = new InMemoryStore()
341-
val stateStore = new AppStatusStore(store)
342341
addListenerFn(new AppStatusListener(store, conf, true))
343-
stateStore
342+
AppStatusPlugin.loadPlugins().foreach { p =>
343+
p.setupListeners(conf, store, addListenerFn, true)
344+
}
345+
new AppStatusStore(store)
344346
}
345347

346348
}

sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.execution.ui.SQLAppStatusPlugin

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
3838
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
3939
import org.apache.spark.sql.execution._
4040
import org.apache.spark.sql.execution.datasources.LogicalRelation
41-
import org.apache.spark.sql.execution.ui.SQLListener
4241
import org.apache.spark.sql.internal._
4342
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
4443
import org.apache.spark.sql.sources.BaseRelation
@@ -957,7 +956,6 @@ object SparkSession {
957956
sparkContext.addSparkListener(new SparkListener {
958957
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
959958
defaultSession.set(null)
960-
sqlListener.set(null)
961959
}
962960
})
963961
}
@@ -1026,9 +1024,6 @@ object SparkSession {
10261024
*/
10271025
def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
10281026

1029-
/** A global SQL listener used for the SQL UI. */
1030-
private[sql] val sqlListener = new AtomicReference[SQLListener]()
1031-
10321027
////////////////////////////////////////////////////////////////////////////////////////
10331028
// Private methods from now on
10341029
////////////////////////////////////////////////////////////////////////////////////////

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,54 @@ import scala.xml.{Node, NodeSeq}
2424

2525
import org.apache.commons.lang3.StringEscapeUtils
2626

27+
import org.apache.spark.JobExecutionStatus
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.ui.{UIUtils, WebUIPage}
2930

3031
private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging {
3132

32-
private val listener = parent.listener
33+
private val sqlStore = parent.sqlStore
3334

3435
override def render(request: HttpServletRequest): Seq[Node] = {
3536
val currentTime = System.currentTimeMillis()
36-
val content = listener.synchronized {
37+
val running = new mutable.ArrayBuffer[SQLExecutionUIData]()
38+
val completed = new mutable.ArrayBuffer[SQLExecutionUIData]()
39+
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
40+
41+
sqlStore.executionsList().foreach { e =>
42+
val isRunning = e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
43+
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
44+
if (isRunning) {
45+
running += e
46+
} else if (isFailed) {
47+
failed += e
48+
} else {
49+
completed += e
50+
}
51+
}
52+
53+
val content = {
3754
val _content = mutable.ListBuffer[Node]()
38-
if (listener.getRunningExecutions.nonEmpty) {
55+
56+
if (running.nonEmpty) {
3957
_content ++=
4058
new RunningExecutionTable(
41-
parent, s"Running Queries (${listener.getRunningExecutions.size})", currentTime,
42-
listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
59+
parent, s"Running Queries (${running.size})", currentTime,
60+
running.sortBy(_.submissionTime).reverse).toNodeSeq
4361
}
44-
if (listener.getCompletedExecutions.nonEmpty) {
62+
63+
if (completed.nonEmpty) {
4564
_content ++=
4665
new CompletedExecutionTable(
47-
parent, s"Completed Queries (${listener.getCompletedExecutions.size})", currentTime,
48-
listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
66+
parent, s"Completed Queries (${completed.size})", currentTime,
67+
completed.sortBy(_.submissionTime).reverse).toNodeSeq
4968
}
50-
if (listener.getFailedExecutions.nonEmpty) {
69+
70+
if (failed.nonEmpty) {
5171
_content ++=
5272
new FailedExecutionTable(
53-
parent, s"Failed Queries (${listener.getFailedExecutions.size})", currentTime,
54-
listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
73+
parent, s"Failed Queries (${failed.size})", currentTime,
74+
failed.sortBy(_.submissionTime).reverse).toNodeSeq
5575
}
5676
_content
5777
}
@@ -65,26 +85,26 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
6585
<div>
6686
<ul class="unstyled">
6787
{
68-
if (listener.getRunningExecutions.nonEmpty) {
88+
if (running.nonEmpty) {
6989
<li>
7090
<a href="#running-execution-table"><strong>Running Queries:</strong></a>
71-
{listener.getRunningExecutions.size}
91+
{running.size}
7292
</li>
7393
}
7494
}
7595
{
76-
if (listener.getCompletedExecutions.nonEmpty) {
96+
if (completed.nonEmpty) {
7797
<li>
7898
<a href="#completed-execution-table"><strong>Completed Queries:</strong></a>
79-
{listener.getCompletedExecutions.size}
99+
{completed.size}
80100
</li>
81101
}
82102
}
83103
{
84-
if (listener.getFailedExecutions.nonEmpty) {
104+
if (failed.nonEmpty) {
85105
<li>
86106
<a href="#failed-execution-table"><strong>Failed Queries:</strong></a>
87-
{listener.getFailedExecutions.size}
107+
{failed.size}
88108
</li>
89109
}
90110
}
@@ -114,23 +134,19 @@ private[ui] abstract class ExecutionTable(
114134

115135
protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = {
116136
val submissionTime = executionUIData.submissionTime
117-
val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime
137+
val duration = executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) -
138+
submissionTime
118139

119-
val runningJobs = executionUIData.runningJobs.map { jobId =>
120-
<a href={jobURL(jobId)}>
121-
[{jobId.toString}]
122-
</a>
123-
}
124-
val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId =>
125-
<a href={jobURL(jobId)}>
126-
[{jobId.toString}]
127-
</a>
128-
}
129-
val failedJobs = executionUIData.failedJobs.sorted.map { jobId =>
130-
<a href={jobURL(jobId)}>
131-
[{jobId.toString}]
132-
</a>
140+
def jobLinks(status: JobExecutionStatus): Seq[Node] = {
141+
executionUIData.jobs.flatMap { case (jobId, jobStatus) =>
142+
if (jobStatus == status) {
143+
<a href={jobURL(jobId)}>[{jobId.toString}]</a>
144+
} else {
145+
None
146+
}
147+
}.toSeq
133148
}
149+
134150
<tr>
135151
<td>
136152
{executionUIData.executionId.toString}
@@ -146,17 +162,17 @@ private[ui] abstract class ExecutionTable(
146162
</td>
147163
{if (showRunningJobs) {
148164
<td>
149-
{runningJobs}
165+
{jobLinks(JobExecutionStatus.RUNNING)}
150166
</td>
151167
}}
152168
{if (showSucceededJobs) {
153169
<td>
154-
{succeededJobs}
170+
{jobLinks(JobExecutionStatus.SUCCEEDED)}
155171
</td>
156172
}}
157173
{if (showFailedJobs) {
158174
<td>
159-
{failedJobs}
175+
{jobLinks(JobExecutionStatus.FAILED)}
160176
</td>
161177
}}
162178
</tr>

0 commit comments

Comments
 (0)