Skip to content

Commit b6e9963

Browse files
carsonwangMarcelo Vanzin
authored andcommitted
[SPARK-11206] Support SQL UI on the history server (resubmit)
Resubmit #9297 and #9991 On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution. To support SQL UI on the history server: 1. I added an onOtherEvent method to the SparkListener trait and post all SQL related events to the same event bus. 2. Two SQL events SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd are defined in the sql module. 3. The new SQL events are written to event log using Jackson. 4. A new trait SparkHistoryListenerFactory is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using java.util.ServiceLoader. Author: Carson Wang <[email protected]> Closes #10061 from carsonwang/SqlHistoryUI.
1 parent f434f36 commit b6e9963

File tree

21 files changed

+329
-135
lines changed

21 files changed

+329
-135
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,5 @@ INDEX
8282
gen-java.*
8383
.*avpr
8484
org.apache.spark.sql.sources.DataSourceRegister
85+
org.apache.spark.scheduler.SparkHistoryListenerFactory
8586
.*parquet

core/src/main/java/org/apache/spark/JavaSparkListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,7 @@ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
8282
@Override
8383
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
8484

85+
@Override
86+
public void onOtherEvent(SparkListenerEvent event) { }
87+
8588
}

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,8 @@ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
118118
onEvent(blockUpdated);
119119
}
120120

121+
@Override
122+
public void onOtherEvent(SparkListenerEvent event) {
123+
onEvent(event);
124+
}
121125
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
207207
// No-op because logging every update would be overkill
208208
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
209209

210+
override def onOtherEvent(event: SparkListenerEvent): Unit = {
211+
logEvent(event, flushLogger = true)
212+
}
213+
210214
/**
211215
* Stop logging events. The event log file will be renamed so that it loses the
212216
* ".inprogress" suffix.

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,19 @@ import java.util.Properties
2222
import scala.collection.Map
2323
import scala.collection.mutable
2424

25-
import org.apache.spark.{Logging, TaskEndReason}
25+
import com.fasterxml.jackson.annotation.JsonTypeInfo
26+
27+
import org.apache.spark.{Logging, SparkConf, TaskEndReason}
2628
import org.apache.spark.annotation.DeveloperApi
2729
import org.apache.spark.executor.TaskMetrics
2830
import org.apache.spark.scheduler.cluster.ExecutorInfo
2931
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
3032
import org.apache.spark.util.{Distribution, Utils}
33+
import org.apache.spark.ui.SparkUI
3134

3235
@DeveloperApi
33-
sealed trait SparkListenerEvent
36+
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
37+
trait SparkListenerEvent
3438

3539
@DeveloperApi
3640
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
@@ -130,6 +134,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
130134
*/
131135
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
132136

137+
/**
138+
* Interface for creating history listeners defined in other modules like SQL, which are used to
139+
* rebuild the history UI.
140+
*/
141+
private[spark] trait SparkHistoryListenerFactory {
142+
/**
143+
* Create listeners used to rebuild the history UI.
144+
*/
145+
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
146+
}
147+
133148
/**
134149
* :: DeveloperApi ::
135150
* Interface for listening to events from the Spark scheduler. Note that this is an internal
@@ -223,6 +238,11 @@ trait SparkListener {
223238
* Called when the driver receives a block update info.
224239
*/
225240
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
241+
242+
/**
243+
* Called when other events like SQL-specific events are posted.
244+
*/
245+
def onOtherEvent(event: SparkListenerEvent) { }
226246
}
227247

228248
/**

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
6161
case blockUpdated: SparkListenerBlockUpdated =>
6262
listener.onBlockUpdated(blockUpdated)
6363
case logStart: SparkListenerLogStart => // ignore event log metadata
64+
case _ => listener.onOtherEvent(event)
6465
}
6566
}
6667

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.ui
1919

20-
import java.util.Date
20+
import java.util.{Date, ServiceLoader}
21+
22+
import scala.collection.JavaConverters._
2123

2224
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
2325
UIRoot}
26+
import org.apache.spark.util.Utils
2427
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
2528
import org.apache.spark.scheduler._
2629
import org.apache.spark.storage.StorageStatusListener
@@ -154,7 +157,16 @@ private[spark] object SparkUI {
154157
appName: String,
155158
basePath: String,
156159
startTime: Long): SparkUI = {
157-
create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
160+
val sparkUI = create(
161+
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
162+
163+
val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
164+
Utils.getContextOrSparkClassLoader).asScala
165+
listenerFactories.foreach { listenerFactory =>
166+
val listeners = listenerFactory.createListeners(conf, sparkUI)
167+
listeners.foreach(listenerBus.addListener)
168+
}
169+
sparkUI
158170
}
159171

160172
/**

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,21 @@ package org.apache.spark.util
1919

2020
import java.util.{Properties, UUID}
2121

22-
import org.apache.spark.scheduler.cluster.ExecutorInfo
23-
2422
import scala.collection.JavaConverters._
2523
import scala.collection.Map
2624

25+
import com.fasterxml.jackson.databind.ObjectMapper
26+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2727
import org.json4s.DefaultFormats
2828
import org.json4s.JsonDSL._
2929
import org.json4s.JsonAST._
30+
import org.json4s.jackson.JsonMethods._
3031

3132
import org.apache.spark._
3233
import org.apache.spark.executor._
3334
import org.apache.spark.rdd.RDDOperationScope
3435
import org.apache.spark.scheduler._
36+
import org.apache.spark.scheduler.cluster.ExecutorInfo
3537
import org.apache.spark.storage._
3638

3739
/**
@@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
5456

5557
private implicit val format = DefaultFormats
5658

59+
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
60+
5761
/** ------------------------------------------------- *
5862
* JSON serialization methods for SparkListenerEvents |
5963
* -------------------------------------------------- */
@@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
96100
executorMetricsUpdateToJson(metricsUpdate)
97101
case blockUpdated: SparkListenerBlockUpdated =>
98102
throw new MatchError(blockUpdated) // TODO(ekl) implement this
103+
case _ => parse(mapper.writeValueAsString(event))
99104
}
100105
}
101106

@@ -506,6 +511,8 @@ private[spark] object JsonProtocol {
506511
case `executorRemoved` => executorRemovedFromJson(json)
507512
case `logStart` => logStartFromJson(json)
508513
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
514+
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
515+
.asInstanceOf[SparkListenerEvent]
509516
}
510517
}
511518

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,6 +1245,7 @@ class SQLContext private[sql](
12451245
sparkContext.addSparkListener(new SparkListener {
12461246
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
12471247
SQLContext.clearInstantiatedContext()
1248+
SQLContext.clearSqlListener()
12481249
}
12491250
})
12501251

@@ -1272,6 +1273,8 @@ object SQLContext {
12721273
*/
12731274
@transient private val instantiatedContext = new AtomicReference[SQLContext]()
12741275

1276+
@transient private val sqlListener = new AtomicReference[SQLListener]()
1277+
12751278
/**
12761279
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
12771280
*
@@ -1316,6 +1319,10 @@ object SQLContext {
13161319
Option(instantiatedContext.get())
13171320
}
13181321

1322+
private[sql] def clearSqlListener(): Unit = {
1323+
sqlListener.set(null)
1324+
}
1325+
13191326
/**
13201327
* Changes the SQLContext that will be returned in this thread and its children when
13211328
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1364,9 +1371,13 @@ object SQLContext {
13641371
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
13651372
*/
13661373
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
1367-
val listener = new SQLListener(sc.conf)
1368-
sc.addSparkListener(listener)
1369-
sc.ui.foreach(new SQLTab(listener, _))
1370-
listener
1374+
if (sqlListener.get() == null) {
1375+
val listener = new SQLListener(sc.conf)
1376+
if (sqlListener.compareAndSet(null, listener)) {
1377+
sc.addSparkListener(listener)
1378+
sc.ui.foreach(new SQLTab(listener, _))
1379+
}
1380+
}
1381+
sqlListener.get()
13711382
}
13721383
}

0 commit comments

Comments
 (0)