diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala index 0726a029e8a0..17a6b83810fd 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -86,7 +86,21 @@ private[spark] class AppStateListener( } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { - // TODO + val details = event.environmentDetails + + val jvmInfo = Map(details("JVM Information"): _*) + val runtime = new v1.RuntimeInfo( + jvmInfo("Java Version"), + jvmInfo("Java Home"), + jvmInfo("Scala Version")) + + val envInfo = new v1.ApplicationEnvironmentInfo( + runtime, + details("Spark Properties"), + details("System Properties"), + details("Classpath Entries")) + + kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) } override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala index 032e8493a826..ce22b6bb9d35 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala @@ -33,6 +33,11 @@ import org.apache.spark.util.{Distribution, Utils} */ private[spark] class AppStateStore(store: KVStore) { + def environmentInfo(): v1.ApplicationEnvironmentInfo = { + val klass = classOf[ApplicationEnvironmentInfoWrapper] + store.read(klass, klass.getName()).info + } + def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { val it = store.view(classOf[JobDataWrapper]).sorted().asScala.map(_.info) if (!statuses.isEmpty()) { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala index 739a8aceae86..e702f8aa2ef2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala @@ -26,20 +26,7 @@ private[v1] class ApplicationEnvironmentResource(ui: SparkUI) { @GET def getEnvironmentInfo(): ApplicationEnvironmentInfo = { - val listener = ui.environmentListener - listener.synchronized { - val jvmInfo = Map(listener.jvmInformation: _*) - val runtime = new RuntimeInfo( - jvmInfo("Java Version"), - jvmInfo("Java Home"), - jvmInfo("Scala Version")) - - new ApplicationEnvironmentInfo( - runtime, - listener.sparkProperties, - listener.systemProperties, - listener.classpathEntries) - } + ui.store.environmentInfo() } } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 08d75aae2764..e8f89fcbd458 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -35,6 +35,17 @@ private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { } +private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvironmentInfo) { + + /** + * There's always a single ApplicationEnvironmentInfo object per application, so this + * ID doesn't need to be dynamic. But the KVStore API requires an ID. + */ + @JsonIgnore @KVIndex + def id: String = classOf[ApplicationEnvironmentInfoWrapper].getName() + +} + private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { @JsonIgnore @KVIndex diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 8d6ac55c05b3..9b5837c6f624 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,11 +25,10 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.status.AppStateStore -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, - UIRoot} +import org.apache.spark.status.api.v1._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} +import org.apache.spark.ui.env.EnvironmentTab import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener @@ -44,7 +43,6 @@ private[spark] class SparkUI private ( val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, - val environmentListener: EnvironmentListener, val storageStatusListener: StorageStatusListener, val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, @@ -71,7 +69,7 @@ private[spark] class SparkUI private ( val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) - attachTab(new EnvironmentTab(this)) + attachTab(new EnvironmentTab(this, store)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) @@ -86,7 +84,11 @@ private[spark] class SparkUI private ( initialize() def getSparkUser: String = { - environmentListener.systemProperties.toMap.getOrElse("user.name", "") + try { + store.environmentInfo().systemProperties.toMap.getOrElse("user.name", "") + } catch { + case _: NoSuchElementException => "" + } } def getAppName: String = appName @@ -138,6 +140,7 @@ private[spark] class SparkUI private ( def setStreamingJobProgressListener(sparkListener: SparkListener): Unit = { streamingJobProgressListener = Option(sparkListener) } + } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) @@ -176,21 +179,19 @@ private[spark] object SparkUI { listenerBus.addListener(listener) listener } - val environmentListener = new EnvironmentListener + val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) - listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) - new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + new SparkUI(store, sc, conf, securityManager, storageStatusListener, executorsListener, + jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index b11f8f1555f1..17e4c3714524 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -21,22 +21,31 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.SparkConf +import org.apache.spark.status.AppStateStore +import org.apache.spark.ui._ import org.apache.spark.util.Utils -private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { - private val listener = parent.listener +private[ui] class EnvironmentPage( + parent: EnvironmentTab, + conf: SparkConf, + store: AppStateStore) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { + val appEnv = store.environmentInfo() + val jvmInformation = Map( + "Java Version" -> appEnv.runtime.javaVersion, + "Java Home" -> appEnv.runtime.javaHome, + "Scala Version" -> appEnv.runtime.scalaVersion) + val runtimeInformationTable = UIUtils.listingTable( - propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) + propertyHeader, jvmRow, jvmInformation, fixedWidth = true) val sparkPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow, - Utils.redact(parent.conf, listener.sparkProperties), fixedWidth = true) - + Utils.redact(conf, appEnv.sparkProperties.toSeq), fixedWidth = true) val systemPropertiesTable = UIUtils.listingTable( - propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true) + propertyHeader, propertyRow, appEnv.systemProperties, fixedWidth = true) val classpathEntriesTable = UIUtils.listingTable( - classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true) + classPathHeaders, classPathRow, appEnv.classpathEntries, fixedWidth = true) val content =

Runtime Information

{runtimeInformationTable} @@ -54,3 +63,9 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} private def classPathRow(data: (String, String)) = {data._1}{data._2} } + +private[ui] class EnvironmentTab( + parent: SparkUI, + store: AppStateStore) extends SparkUITab(parent, "environment") { + attachPage(new EnvironmentPage(this, parent.conf, store)) +} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala deleted file mode 100644 index 8c18464e6477..000000000000 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.env - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.scheduler._ -import org.apache.spark.ui._ - -private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") { - val listener = parent.environmentListener - val conf = parent.conf - attachPage(new EnvironmentPage(this)) -} - -/** - * :: DeveloperApi :: - * A SparkListener that prepares information to be displayed on the EnvironmentTab - */ -@DeveloperApi -@deprecated("This class will be removed in a future release.", "2.2.0") -class EnvironmentListener extends SparkListener { - var jvmInformation = Seq[(String, String)]() - var sparkProperties = Seq[(String, String)]() - var systemProperties = Seq[(String, String)]() - var classpathEntries = Seq[(String, String)]() - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - jvmInformation = environmentDetails("JVM Information") - sparkProperties = environmentDetails("Spark Properties") - systemProperties = environmentDetails("System Properties") - classpathEntries = environmentDetails("Classpath Entries") - } - } -} diff --git a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala index 37cd63f5b89f..d0dfb80b4b2c 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala @@ -56,6 +56,46 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { Utils.deleteRecursively(testDir) } + test("environment info") { + val listener = new AppStateListener(store, conf, true) + + val details = Map( + "JVM Information" -> Seq( + "Java Version" -> sys.props("java.version"), + "Java Home" -> sys.props("java.home"), + "Scala Version" -> scala.util.Properties.versionString + ), + "Spark Properties" -> Seq( + "spark.conf.1" -> "1", + "spark.conf.2" -> "2" + ), + "System Properties" -> Seq( + "sys.prop.1" -> "1", + "sys.prop.2" -> "2" + ), + "Classpath Entries" -> Seq( + "/jar1" -> "System", + "/jar2" -> "User" + ) + ) + + listener.onEnvironmentUpdate(SparkListenerEnvironmentUpdate(details)) + + val appEnvKey = classOf[ApplicationEnvironmentInfoWrapper].getName() + check[ApplicationEnvironmentInfoWrapper](appEnvKey) { env => + val info = env.info + + val runtimeInfo = Map(details("JVM Information"): _*) + assert(info.runtime.javaVersion == runtimeInfo("Java Version")) + assert(info.runtime.javaHome == runtimeInfo("Java Home")) + assert(info.runtime.scalaVersion == runtimeInfo("Scala Version")) + + assert(info.sparkProperties === details("Spark Properties")) + assert(info.systemProperties === details("System Properties")) + assert(info.classpathEntries === details("Classpath Entries")) + } + } + test("scheduler events") { val listener = new AppStateListener(store, conf, true) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index feba90ac2c95..724b5102b7b4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -39,6 +39,7 @@ object MimaExcludes { // SPARK-18085: Better History Server scalability for many / large applications ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.env.EnvironmentListener"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") )