Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
Expand Down Expand Up @@ -246,6 +246,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def isStopped: Boolean = stopped.get()

private[spark] def statusStore: AppStatusStore = _statusStore

// An asynchronous listener bus for Spark events
private[spark] def listenerBus: LiveListenerBus = _listenerBus

Expand Down Expand Up @@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
_ui.foreach { ui =>
// Load any plugins that might want to modify the UI.
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin the live UI doesn't need a 2-step process to set up the UI, while history server needs. That's why I think they should not share one plugin interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's continue the discussion on the other PR.


// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
ui.bind()
}

_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.deploy.history.config._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status.{AppStatusListener, AppStatusStore, AppStatusStoreMetadata, KVUtils}
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1
import org.apache.spark.ui.SparkUI
Expand Down Expand Up @@ -319,6 +319,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val _listener = new AppStatusListener(kvstore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(_listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
}
Some(_listener)
} else {
None
Expand All @@ -333,11 +336,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

try {
val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
val listeners = listenerFactory.createListeners(conf, loadedUI.ui)
listeners.foreach(replayBus.addListener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupUI(loadedUI.ui)
}

val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
* rebuild the history UI.
*/
private[spark] trait SparkHistoryListenerFactory {
/**
* Create listeners used to rebuild the history UI.
*/
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
}


/**
* Interface for listening to events from the Spark scheduler. Most applications should probably
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status

import java.util.ServiceLoader

import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.KVStore

/**
* An interface that defines plugins for collecting and storing application state.
*
* The plugin implementations are invoked for both live and replayed applications. For live
* applications, it's recommended that plugins defer creation of UI tabs until there's actual
* data to be shown.
*/
private[spark] trait AppStatusPlugin {

/**
* Install listeners to collect data about the running application and populate the given
* store.
*
* @param conf The Spark configuration.
* @param store The KVStore where to keep application data.
* @param addListenerFn Function to register listeners with a bus.
* @param live Whether this is a live application (or an application being replayed by the
* HistoryServer).
*/
def setupListeners(
conf: SparkConf,
store: KVStore,
addListenerFn: SparkListener => Unit,
live: Boolean): Unit

/**
* Install any needed extensions (tabs, pages, etc) to a Spark UI. The plugin can detect whether
* the app is live or replayed by looking at the UI's SparkContext field `sc`.
*
* @param ui The Spark UI instance for the application.
*/
def setupUI(ui: SparkUI): Unit

}

private[spark] object AppStatusPlugin {

def loadPlugins(): Iterable[AppStatusPlugin] = {
ServiceLoader.load(classOf[AppStatusPlugin], Utils.getContextOrSparkClassLoader).asScala
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
/**
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
*/
private[spark] class AppStatusStore(store: KVStore) {
private[spark] class AppStatusStore(val store: KVStore) {

def applicationInfo(): v1.ApplicationInfo = {
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
Expand Down Expand Up @@ -338,9 +338,11 @@ private[spark] object AppStatusStore {
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore()
val stateStore = new AppStatusStore(store)
addListenerFn(new AppStatusListener(store, conf, true))
stateStore
AppStatusPlugin.loadPlugins().foreach { p =>
p.setupListeners(conf, store, addListenerFn, true)
}
new AppStatusStore(store)
}

}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLAppStatusPlugin
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -957,7 +956,6 @@ object SparkSession {
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
sqlListener.set(null)
}
})
}
Expand Down Expand Up @@ -1026,9 +1024,6 @@ object SparkSession {
*/
def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)

/** A global SQL listener used for the SQL UI. */
private[sql] val sqlListener = new AtomicReference[SQLListener]()

////////////////////////////////////////////////////////////////////////////////////////
// Private methods from now on
////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,54 @@ import scala.xml.{Node, NodeSeq}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}

private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging {

private val listener = parent.listener
private val sqlStore = parent.sqlStore

override def render(request: HttpServletRequest): Seq[Node] = {
val currentTime = System.currentTimeMillis()
val content = listener.synchronized {
val running = new mutable.ArrayBuffer[SQLExecutionUIData]()
val completed = new mutable.ArrayBuffer[SQLExecutionUIData]()
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()

sqlStore.executionsList().foreach { e =>
val isRunning = e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
if (isRunning) {
running += e
} else if (isFailed) {
failed += e
} else {
completed += e
}
}

val content = {
val _content = mutable.ListBuffer[Node]()
if (listener.getRunningExecutions.nonEmpty) {

if (running.nonEmpty) {
_content ++=
new RunningExecutionTable(
parent, s"Running Queries (${listener.getRunningExecutions.size})", currentTime,
listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
parent, s"Running Queries (${running.size})", currentTime,
running.sortBy(_.submissionTime).reverse).toNodeSeq
}
if (listener.getCompletedExecutions.nonEmpty) {

if (completed.nonEmpty) {
_content ++=
new CompletedExecutionTable(
parent, s"Completed Queries (${listener.getCompletedExecutions.size})", currentTime,
listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
parent, s"Completed Queries (${completed.size})", currentTime,
completed.sortBy(_.submissionTime).reverse).toNodeSeq
}
if (listener.getFailedExecutions.nonEmpty) {

if (failed.nonEmpty) {
_content ++=
new FailedExecutionTable(
parent, s"Failed Queries (${listener.getFailedExecutions.size})", currentTime,
listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
parent, s"Failed Queries (${failed.size})", currentTime,
failed.sortBy(_.submissionTime).reverse).toNodeSeq
}
_content
}
Expand All @@ -65,26 +85,26 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
<div>
<ul class="unstyled">
{
if (listener.getRunningExecutions.nonEmpty) {
if (running.nonEmpty) {
<li>
<a href="#running-execution-table"><strong>Running Queries:</strong></a>
{listener.getRunningExecutions.size}
{running.size}
</li>
}
}
{
if (listener.getCompletedExecutions.nonEmpty) {
if (completed.nonEmpty) {
<li>
<a href="#completed-execution-table"><strong>Completed Queries:</strong></a>
{listener.getCompletedExecutions.size}
{completed.size}
</li>
}
}
{
if (listener.getFailedExecutions.nonEmpty) {
if (failed.nonEmpty) {
<li>
<a href="#failed-execution-table"><strong>Failed Queries:</strong></a>
{listener.getFailedExecutions.size}
{failed.size}
</li>
}
}
Expand Down Expand Up @@ -114,23 +134,19 @@ private[ui] abstract class ExecutionTable(

protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = {
val submissionTime = executionUIData.submissionTime
val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime
val duration = executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) -
submissionTime

val runningJobs = executionUIData.runningJobs.map { jobId =>
<a href={jobURL(jobId)}>
[{jobId.toString}]
</a>
}
val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId =>
<a href={jobURL(jobId)}>
[{jobId.toString}]
</a>
}
val failedJobs = executionUIData.failedJobs.sorted.map { jobId =>
<a href={jobURL(jobId)}>
[{jobId.toString}]
</a>
def jobLinks(status: JobExecutionStatus): Seq[Node] = {
executionUIData.jobs.flatMap { case (jobId, jobStatus) =>
if (jobStatus == status) {
<a href={jobURL(jobId)}>[{jobId.toString}]</a>
} else {
None
}
}.toSeq
}

<tr>
<td>
{executionUIData.executionId.toString}
Expand All @@ -146,17 +162,17 @@ private[ui] abstract class ExecutionTable(
</td>
{if (showRunningJobs) {
<td>
{runningJobs}
{jobLinks(JobExecutionStatus.RUNNING)}
</td>
}}
{if (showSucceededJobs) {
<td>
{succeededJobs}
{jobLinks(JobExecutionStatus.SUCCEEDED)}
</td>
}}
{if (showFailedJobs) {
<td>
{failedJobs}
{jobLinks(JobExecutionStatus.FAILED)}
</td>
}}
</tr>
Expand Down
Loading