Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ private[history] abstract class ApplicationHistoryProvider {
@throws(classOf[SparkException])
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit

def getAppStatus(appid: String): Boolean

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

private val appStatus = new mutable.HashMap[String, Boolean]()

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
Expand Down Expand Up @@ -445,6 +447,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
appStatus.put(logPath.getName(), appCompleted)
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
Expand Down Expand Up @@ -529,6 +532,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

def getAppStatus(appid: String): Boolean = {
if (appStatus.keySet.contains(appid)) {
return true
}
else if (appStatus.contains(appid + EventLoggingListener.IN_PROGRESS)) {
return false
}
else {
val e = new NoSuchElementException(s"no app with key $appid.")
e.initCause(new NoSuchElementException)
throw e
}
}

}

private object FsHistoryProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}

import scala.collection.mutable

/**
* A web server that renders SparkUIs of completed applications.
*
Expand All @@ -50,6 +52,8 @@ class HistoryServer(
port: Int)
extends WebUI(securityManager, port, conf) with Logging with UIRoot {

private val loadedAppStatus = new mutable.HashMap[String, Boolean]()

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)

Expand Down Expand Up @@ -103,7 +107,7 @@ class HistoryServer(
// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI() + "/ui"))
}

// SPARK-5983 ensure TRACE is not supported
Expand Down Expand Up @@ -190,7 +194,14 @@ class HistoryServer(

private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = {
try {
appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse(""))
val app_attemp_id = appId + attemptId.map { id => s"_$id" }.getOrElse("")
if (!loadedAppStatus.get(app_attemp_id).isDefined) {
loadedAppStatus.put(app_attemp_id, provider.getAppStatus(app_attemp_id))
appCache.refresh(appId + attemptId.map { id => s"/$id" }.getOrElse(""))
} else if (!loadedAppStatus.get(app_attemp_id).get) {
loadedAppStatus.update(app_attemp_id, provider.getAppStatus(app_attemp_id))
appCache.refresh(appId + attemptId.map { id => s"/$id" }.getOrElse(""))
}
true
} catch {
case e: Exception => e.getCause() match {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ private[spark] class SparkUI private (
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
if (sc.isDefined) {
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
} else {
attachHandler(createRedirectHandler("/ui", "/jobs", basePath = basePath))
}
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
Expand Down