Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,21 @@ private[spark] class AppStateListener(
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
// TODO
val details = event.environmentDetails

val jvmInfo = Map(details("JVM Information"): _*)
val runtime = new v1.RuntimeInfo(
jvmInfo("Java Version"),
jvmInfo("Java Home"),
jvmInfo("Scala Version"))

val envInfo = new v1.ApplicationEnvironmentInfo(
runtime,
details("Spark Properties"),
details("System Properties"),
details("Classpath Entries"))

kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import org.apache.spark.util.{Distribution, Utils}
*/
private[spark] class AppStateStore(store: KVStore) {

def environmentInfo(): v1.ApplicationEnvironmentInfo = {
val klass = classOf[ApplicationEnvironmentInfoWrapper]
store.read(klass, klass.getName()).info
}

def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
val it = store.view(classOf[JobDataWrapper]).sorted().asScala.map(_.info)
if (!statuses.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,7 @@ private[v1] class ApplicationEnvironmentResource(ui: SparkUI) {

@GET
def getEnvironmentInfo(): ApplicationEnvironmentInfo = {
val listener = ui.environmentListener
listener.synchronized {
val jvmInfo = Map(listener.jvmInformation: _*)
val runtime = new RuntimeInfo(
jvmInfo("Java Version"),
jvmInfo("Java Home"),
jvmInfo("Scala Version"))

new ApplicationEnvironmentInfo(
runtime,
listener.sparkProperties,
listener.systemProperties,
listener.classpathEntries)
}
ui.store.environmentInfo()
}

}
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) {

}

private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvironmentInfo) {

/**
* There's always a single ApplicationEnvironmentInfo object per application, so this
* ID doesn't need to be dynamic. But the KVStore API requires an ID.
*/
@JsonIgnore @KVIndex
def id: String = classOf[ApplicationEnvironmentInfoWrapper].getName()

}

private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {

@JsonIgnore @KVIndex
Expand Down
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStateStore
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
import org.apache.spark.status.api.v1._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
Expand All @@ -44,7 +43,6 @@ private[spark] class SparkUI private (
val sc: Option[SparkContext],
val conf: SparkConf,
securityManager: SecurityManager,
val environmentListener: EnvironmentListener,
val storageStatusListener: StorageStatusListener,
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
Expand All @@ -71,7 +69,7 @@ private[spark] class SparkUI private (
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new EnvironmentTab(this, store))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
Expand All @@ -86,7 +84,11 @@ private[spark] class SparkUI private (
initialize()

def getSparkUser: String = {
environmentListener.systemProperties.toMap.getOrElse("user.name", "<unknown>")
try {
store.environmentInfo().systemProperties.toMap.getOrElse("user.name", "<unknown>")
} catch {
case _: NoSuchElementException => "<unknown>"
}
}

def getAppName: String = appName
Expand Down Expand Up @@ -138,6 +140,7 @@ private[spark] class SparkUI private (
def setStreamingJobProgressListener(sparkListener: SparkListener): Unit = {
streamingJobProgressListener = Option(sparkListener)
}

}

private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
Expand Down Expand Up @@ -176,21 +179,19 @@ private[spark] object SparkUI {
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener

val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)

listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)

new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
new SparkUI(store, sc, conf, securityManager, storageStatusListener, executorsListener,
jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime)
}

}
31 changes: 23 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.SparkConf
import org.apache.spark.status.AppStateStore
import org.apache.spark.ui._
import org.apache.spark.util.Utils

private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val listener = parent.listener
private[ui] class EnvironmentPage(
parent: EnvironmentTab,
conf: SparkConf,
store: AppStateStore) extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node] = {
val appEnv = store.environmentInfo()
val jvmInformation = Map(
"Java Version" -> appEnv.runtime.javaVersion,
"Java Home" -> appEnv.runtime.javaHome,
"Scala Version" -> appEnv.runtime.scalaVersion)

val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
propertyHeader, jvmRow, jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
Utils.redact(parent.conf, listener.sparkProperties), fixedWidth = true)

Utils.redact(conf, appEnv.sparkProperties.toSeq), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
propertyHeader, propertyRow, appEnv.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
classPathHeaders, classPathRow, appEnv.classpathEntries, fixedWidth = true)
val content =
<span>
<h4>Runtime Information</h4> {runtimeInformationTable}
Expand All @@ -54,3 +63,9 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("")
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}

private[ui] class EnvironmentTab(
parent: SparkUI,
store: AppStateStore) extends SparkUITab(parent, "environment") {
attachPage(new EnvironmentPage(this, parent.conf, store))
}
51 changes: 0 additions & 51 deletions core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,46 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
Utils.deleteRecursively(testDir)
}

test("environment info") {
val listener = new AppStateListener(store, conf, true)

val details = Map(
"JVM Information" -> Seq(
"Java Version" -> sys.props("java.version"),
"Java Home" -> sys.props("java.home"),
"Scala Version" -> scala.util.Properties.versionString
),
"Spark Properties" -> Seq(
"spark.conf.1" -> "1",
"spark.conf.2" -> "2"
),
"System Properties" -> Seq(
"sys.prop.1" -> "1",
"sys.prop.2" -> "2"
),
"Classpath Entries" -> Seq(
"/jar1" -> "System",
"/jar2" -> "User"
)
)

listener.onEnvironmentUpdate(SparkListenerEnvironmentUpdate(details))

val appEnvKey = classOf[ApplicationEnvironmentInfoWrapper].getName()
check[ApplicationEnvironmentInfoWrapper](appEnvKey) { env =>
val info = env.info

val runtimeInfo = Map(details("JVM Information"): _*)
assert(info.runtime.javaVersion == runtimeInfo("Java Version"))
assert(info.runtime.javaHome == runtimeInfo("Java Home"))
assert(info.runtime.scalaVersion == runtimeInfo("Scala Version"))

assert(info.sparkProperties === details("Spark Properties"))
assert(info.systemProperties === details("System Properties"))
assert(info.classpathEntries === details("Classpath Entries"))
}
}

test("scheduler events") {
val listener = new AppStateListener(store, conf, true)

Expand Down
1 change: 1 addition & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object MimaExcludes {
// SPARK-18085: Better History Server scalability for many / large applications
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.env.EnvironmentListener"),
// [SPARK-20495][SQL] Add StorageLevel to cacheTable API
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable")
)
Expand Down