From d66024c50e46dcca5692b8478ee12fcfc8d6805f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 30 Jan 2017 14:06:15 -0800 Subject: [PATCH] SHS-NG M4.2: Port executors page to new backend. The executors page is built on top of the REST API, so the page itself was easy to hook up to the new code. Some other pages depend on the `ExecutorListener` class that is being removed, though, so they needed to be modified to use data from the new store. Fortunately, all they seemed to need is the map of executor logs, so that was somewhat easy too. The executor timeline graph required some extra code to save the executor-related events in the UI store. This just implements the existing functionality, without making any changes related to efficiency or scalability of that graph. I had to change some of the test golden files because the old code would return executors in "random" order (since it used a mutable Map instead of something that returns a sorted list), and the new code returns executors in id order. --- .../scheduler/cluster/ExecutorData.scala | 7 + .../spark/status/AppStateListener.scala | 12 + .../apache/spark/status/AppStateStore.scala | 15 +- .../api/v1/AllExecutorListResource.scala | 15 +- .../status/api/v1/ExecutorListResource.scala | 14 +- .../org/apache/spark/status/storeTypes.scala | 9 + .../scala/org/apache/spark/ui/SparkUI.scala | 11 +- .../ui/exec/ExecutorThreadDumpPage.scala | 9 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 154 ------------- .../apache/spark/ui/exec/ExecutorsTab.scala | 205 +++--------------- .../apache/spark/ui/jobs/AllJobsPage.scala | 3 +- .../apache/spark/ui/jobs/ExecutorTable.scala | 11 +- .../org/apache/spark/ui/jobs/JobPage.scala | 3 +- .../org/apache/spark/ui/jobs/JobsTab.scala | 3 +- .../org/apache/spark/ui/jobs/StagePage.scala | 18 +- .../org/apache/spark/ui/jobs/StagesTab.scala | 8 +- .../executor_memory_usage_expectation.json | 122 +++++------ ...xecutor_node_blacklisting_expectation.json | 112 +++++----- ...acklisting_unblacklisting_expectation.json | 84 +++---- .../org/apache/spark/ui/StagePageSuite.scala | 10 +- project/MimaExcludes.scala | 1 + 21 files changed, 273 insertions(+), 553 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b25a4bfb501f..70fafa9b0b30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -17,6 +17,10 @@ package org.apache.spark.scheduler.cluster +import scala.annotation.meta.getter + +import com.fasterxml.jackson.annotation.JsonIgnore + import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} /** @@ -29,9 +33,12 @@ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} * @param totalCores The total number of cores available to the executor */ private[cluster] class ExecutorData( + @(JsonIgnore @getter) val executorEndpoint: RpcEndpointRef, + @(JsonIgnore @getter) val executorAddress: RpcAddress, override val executorHost: String, + @(JsonIgnore @getter) var freeCores: Int, override val totalCores: Int, override val logUrlMap: Map[String, String] diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala index 17a6b83810fd..32ed6ad5fd01 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -47,6 +47,7 @@ private[spark] class AppStateListener( private var appInfo: v1.ApplicationInfo = null private var coresPerTask: Int = 1 + private var executorEventId: Long = 0L // How often to update live entities. -1 means "never update" when replaying applications, // meaning only the last write will happen. For live applications, this avoids a few @@ -100,6 +101,8 @@ private[spark] class AppStateListener( details("System Properties"), details("Classpath Entries")) + coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt) + .getOrElse(coresPerTask) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) } @@ -135,6 +138,8 @@ private[spark] class AppStateListener( exec.maxTasks = event.executorInfo.totalCores / coresPerTask exec.executorLogs = event.executorInfo.logUrlMap liveUpdate(exec) + + writeExecutorEvent(event) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { @@ -142,12 +147,19 @@ private[spark] class AppStateListener( exec.isActive = false update(exec) } + + writeExecutorEvent(event) } override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { updateBlackListStatus(event.executorId, true) } + private def writeExecutorEvent(event: SparkListenerEvent): Unit = { + executorEventId += 1 + kvstore.write(new ExecutorEventData(executorEventId, event)) + } + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { updateBlackListStatus(event.executorId, false) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala index ce22b6bb9d35..aedd66241317 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.kvstore.{InMemoryStore, KVStore} -import org.apache.spark.scheduler.SparkListenerBus +import org.apache.spark.scheduler.{SparkListenerBus, SparkListenerEvent} import org.apache.spark.status.api.v1 import org.apache.spark.util.{Distribution, Utils} @@ -56,6 +56,15 @@ private[spark] class AppStateStore(store: KVStore) { .last(true).asScala.map(_.info).toSeq } + def executorSummary(executorId: String): Option[v1.ExecutorSummary] = { + try { + Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info) + } catch { + case _: NoSuchElementException => + None + } + } + def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { val it = store.view(classOf[StageDataWrapper]).sorted().asScala.map(_.info) if (!statuses.isEmpty) { @@ -198,6 +207,10 @@ private[spark] class AppStateStore(store: KVStore) { store.read(classOf[RDDStorageInfoWrapper], rddId).info } + def executorEvents(): Seq[SparkListenerEvent] = { + store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq + } + def close(): Unit = { store.close() } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala index 01f2a18122e6..f4bc752d31f7 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala @@ -20,22 +20,11 @@ import javax.ws.rs.{GET, Produces} import javax.ws.rs.core.MediaType import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.exec.ExecutorsPage @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class AllExecutorListResource(ui: SparkUI) { @GET - def executorList(): Seq[ExecutorSummary] = { - val listener = ui.executorsListener - listener.synchronized { - // The follow codes should be protected by `listener` to make sure no executors will be - // removed before we query their status. See SPARK-12784. - (0 until listener.activeStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - } ++ (0 until listener.deadStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = false) - } - } - } + def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false) + } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index ab5388159418..03f359262332 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -20,21 +20,11 @@ import javax.ws.rs.{GET, Produces} import javax.ws.rs.core.MediaType import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.exec.ExecutorsPage @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ExecutorListResource(ui: SparkUI) { @GET - def executorList(): Seq[ExecutorSummary] = { - val listener = ui.executorsListener - listener.synchronized { - // The follow codes should be protected by `listener` to make sure no executors will be - // removed before we query their status. See SPARK-12784. - val storageStatusList = listener.activeStorageStatusList - (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - } - } - } + def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true) + } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index e8f89fcbd458..12ad4f383beb 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -22,6 +22,7 @@ import java.lang.{Integer => JInteger, Long => JLong} import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.kvstore.KVIndex +import org.apache.spark.scheduler.SparkListenerEvent import org.apache.spark.status.api.v1._ import org.apache.spark.status.KVUtils._ @@ -132,3 +133,11 @@ private[spark] class ExecutorStageSummaryWrapper( private[this] val stage: Array[Int] = Array(stageId, stageAttemptId) } + +/** + * Store raw executor events so that the executor timeline can be drawn. The event is wrapped + * in a container so that a monotonically increasing ID can be added to it. + */ +private[spark] class ExecutorEventData( + @KVIndexParam val id: Long, + val event: SparkListenerEvent) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 9b5837c6f624..6422690f17cb 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.status.api.v1._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.EnvironmentTab -import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} +import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener import org.apache.spark.ui.storage.{StorageListener, StorageTab} @@ -44,7 +44,6 @@ private[spark] class SparkUI private ( val conf: SparkConf, securityManager: SecurityManager, val storageStatusListener: StorageStatusListener, - val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, val operationGraphListener: RDDOperationGraphListener, @@ -66,7 +65,7 @@ private[spark] class SparkUI private ( def initialize() { val jobsTab = new JobsTab(this) attachTab(jobsTab) - val stagesTab = new StagesTab(this) + val stagesTab = new StagesTab(this, store) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this, store)) @@ -181,17 +180,15 @@ private[spark] object SparkUI { } val storageStatusListener = new StorageStatusListener(conf) - val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(storageStatusListener) - listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) - new SparkUI(store, sc, conf, securityManager, storageStatusListener, executorsListener, - jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) + new SparkUI(store, sc, conf, securityManager, storageStatusListener, jobProgressListener, + storageListener, operationGraphListener, appName, basePath, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index 6ce3f511e89c..483f94ed0fa0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -22,11 +22,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, Text} -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.SparkContext +import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage} -private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { - - private val sc = parent.sc +private[ui] class ExecutorThreadDumpPage( + parent: SparkUITab, + sc: Option[SparkContext]) extends WebUIPage("threadDump") { def render(request: HttpServletRequest): Seq[Node] = { val executorId = Option(request.getParameter("executorId")).map { executorId => diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala deleted file mode 100644 index b7cbed468517..000000000000 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.exec - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics} -import org.apache.spark.ui.{UIUtils, WebUIPage} - -// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive -private[ui] case class ExecutorSummaryInfo( - id: String, - hostPort: String, - rddBlocks: Int, - memoryUsed: Long, - diskUsed: Long, - activeTasks: Int, - failedTasks: Int, - completedTasks: Int, - totalTasks: Int, - totalDuration: Long, - totalInputBytes: Long, - totalShuffleRead: Long, - totalShuffleWrite: Long, - isBlacklisted: Int, - maxOnHeapMem: Long, - maxOffHeapMem: Long, - executorLogs: Map[String, String]) - - -private[ui] class ExecutorsPage( - parent: ExecutorsTab, - threadDumpEnabled: Boolean) - extends WebUIPage("") { - - def render(request: HttpServletRequest): Seq[Node] = { - val content = -
- { -
- - - Show Additional Metrics - - -
++ -
++ - ++ - ++ - - } -
- - UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true) - } -} - -private[spark] object ExecutorsPage { - private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " + - "storage of data like RDD partitions cached in memory." - private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " + - "storage of data like RDD partitions cached in memory." - - /** Represent an executor's info as a map given a storage status index */ - def getExecInfo( - listener: ExecutorsListener, - statusId: Int, - isActive: Boolean): ExecutorSummary = { - val status = if (isActive) { - listener.activeStorageStatusList(statusId) - } else { - listener.deadStorageStatusList(statusId) - } - val execId = status.blockManagerId.executorId - val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.numBlocks - val memUsed = status.memUsed - val maxMem = status.maxMem - val memoryMetrics = for { - onHeapUsed <- status.onHeapMemUsed - offHeapUsed <- status.offHeapMemUsed - maxOnHeap <- status.maxOnHeapMem - maxOffHeap <- status.maxOffHeapMem - } yield { - new MemoryMetrics(onHeapUsed, offHeapUsed, maxOnHeap, maxOffHeap) - } - - - val diskUsed = status.diskUsed - val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId)) - - new ExecutorSummary( - execId, - hostPort, - isActive, - rddBlocks, - memUsed, - diskUsed, - taskSummary.totalCores, - taskSummary.tasksMax, - taskSummary.tasksActive, - taskSummary.tasksFailed, - taskSummary.tasksComplete, - taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete, - taskSummary.duration, - taskSummary.jvmGCTime, - taskSummary.inputBytes, - taskSummary.shuffleRead, - taskSummary.shuffleWrite, - taskSummary.isBlacklisted, - maxMem, - taskSummary.executorLogs, - memoryMetrics - ) - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index aabf6e0c63c0..310d22d10677 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -17,192 +17,45 @@ package org.apache.spark.ui.exec -import scala.collection.mutable.{LinkedHashMap, ListBuffer} +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node -import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ -import org.apache.spark.storage.{StorageStatus, StorageStatusListener} -import org.apache.spark.ui.{SparkUI, SparkUITab} +import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils, WebUIPage} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { - val listener = parent.executorsListener - val sc = parent.sc - val threadDumpEnabled = - sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) - attachPage(new ExecutorsPage(this, threadDumpEnabled)) - if (threadDumpEnabled) { - attachPage(new ExecutorThreadDumpPage(this)) - } -} + init() -private[ui] case class ExecutorTaskSummary( - var executorId: String, - var totalCores: Int = 0, - var tasksMax: Int = 0, - var tasksActive: Int = 0, - var tasksFailed: Int = 0, - var tasksComplete: Int = 0, - var duration: Long = 0L, - var jvmGCTime: Long = 0L, - var inputBytes: Long = 0L, - var inputRecords: Long = 0L, - var outputBytes: Long = 0L, - var outputRecords: Long = 0L, - var shuffleRead: Long = 0L, - var shuffleWrite: Long = 0L, - var executorLogs: Map[String, String] = Map.empty, - var isAlive: Boolean = true, - var isBlacklisted: Boolean = false -) - -/** - * :: DeveloperApi :: - * A SparkListener that prepares information to be displayed on the ExecutorsTab - */ -@DeveloperApi -@deprecated("This class will be removed in a future release.", "2.2.0") -class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) - extends SparkListener { - val executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]() - var executorEvents = new ListBuffer[SparkListenerEvent]() + private def init(): Unit = { + val threadDumpEnabled = + parent.sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) - private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) - private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) - - def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList - - def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList - - override def onExecutorAdded( - executorAdded: SparkListenerExecutorAdded): Unit = synchronized { - val eid = executorAdded.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap - taskSummary.totalCores = executorAdded.executorInfo.totalCores - taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1) - executorEvents += executorAdded - if (executorEvents.size > maxTimelineExecutors) { - executorEvents.remove(0) + attachPage(new ExecutorsPage(this, threadDumpEnabled)) + if (threadDumpEnabled) { + attachPage(new ExecutorThreadDumpPage(this, parent.sc)) } - - val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive) - if (deadExecutors.size > retainedDeadExecutors) { - val head = deadExecutors.head - executorToTaskSummary.remove(head._1) - } - } - - override def onExecutorRemoved( - executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized { - executorEvents += executorRemoved - if (executorEvents.size > maxTimelineExecutors) { - executorEvents.remove(0) - } - executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false) } - override def onApplicationStart( - applicationStart: SparkListenerApplicationStart): Unit = { - applicationStart.driverLogs.foreach { logs => - val storageStatus = activeStorageStatusList.find { s => - s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || - s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER - } - storageStatus.foreach { s => - val eid = s.blockManagerId.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskSummary.executorLogs = logs.toMap - } - } - } - - override def onTaskStart( - taskStart: SparkListenerTaskStart): Unit = synchronized { - val eid = taskStart.taskInfo.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskSummary.tasksActive += 1 - } - - override def onTaskEnd( - taskEnd: SparkListenerTaskEnd): Unit = synchronized { - val info = taskEnd.taskInfo - if (info != null) { - val eid = info.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskEnd.reason match { - case Resubmitted => - // Note: For resubmitted tasks, we continue to use the metrics that belong to the - // first attempt of this task. This may not be 100% accurate because the first attempt - // could have failed half-way through. The correct fix would be to keep track of the - // metrics added by each attempt, but this is much more complicated. - return - case _: ExceptionFailure => - taskSummary.tasksFailed += 1 - case _ => - taskSummary.tasksComplete += 1 - } - if (taskSummary.tasksActive >= 1) { - taskSummary.tasksActive -= 1 - } - taskSummary.duration += info.duration - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - taskSummary.inputBytes += metrics.inputMetrics.bytesRead - taskSummary.inputRecords += metrics.inputMetrics.recordsRead - taskSummary.outputBytes += metrics.outputMetrics.bytesWritten - taskSummary.outputRecords += metrics.outputMetrics.recordsWritten - - taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead - taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten - taskSummary.jvmGCTime += metrics.jvmGCTime - } - } - } - - private def updateExecutorBlacklist( - eid: String, - isBlacklisted: Boolean): Unit = { - val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - execTaskSummary.isBlacklisted = isBlacklisted - } - - override def onExecutorBlacklisted( - executorBlacklisted: SparkListenerExecutorBlacklisted) - : Unit = synchronized { - updateExecutorBlacklist(executorBlacklisted.executorId, true) - } - - override def onExecutorUnblacklisted( - executorUnblacklisted: SparkListenerExecutorUnblacklisted) - : Unit = synchronized { - updateExecutorBlacklist(executorUnblacklisted.executorId, false) - } - - override def onNodeBlacklisted( - nodeBlacklisted: SparkListenerNodeBlacklisted) - : Unit = synchronized { - // Implicitly blacklist every executor associated with this node, and show this in the UI. - activeStorageStatusList.foreach { status => - if (status.blockManagerId.host == nodeBlacklisted.hostId) { - updateExecutorBlacklist(status.blockManagerId.executorId, true) - } - } - } +} - override def onNodeUnblacklisted( - nodeUnblacklisted: SparkListenerNodeUnblacklisted) - : Unit = synchronized { - // Implicitly unblacklist every executor associated with this node, regardless of how - // they may have been blacklisted initially (either explicitly through executor blacklisting - // or implicitly through node blacklisting). Show this in the UI. - activeStorageStatusList.foreach { status => - if (status.blockManagerId.host == nodeUnblacklisted.hostId) { - updateExecutorBlacklist(status.blockManagerId.executorId, false) - } - } +private[ui] class ExecutorsPage( + parent: SparkUITab, + threadDumpEnabled: Boolean) + extends WebUIPage("") { + + def render(request: HttpServletRequest): Seq[Node] = { + val content = +
+ { +
++ + ++ + ++ + + } +
+ + UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 18be0870746e..5818cc364432 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -358,9 +358,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { var content = summary - val executorListener = parent.executorListener content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs, - executorListener.executorEvents, startTime) + parent.parent.store.executorEvents(), startTime) if (shouldShowActiveJobs) { content ++=

Active Jobs ({activeJobs.size})

++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 382a6f979f2e..7cd625925c2c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -20,12 +20,17 @@ package org.apache.spark.ui.jobs import scala.collection.mutable import scala.xml.{Node, Unparsed} +import org.apache.spark.status.AppStateStore import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Stage summary grouped by executors. */ -private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) { +private[ui] class ExecutorTable( + stageId: Int, + stageAttemptId: Int, + parent: StagesTab, + store: AppStateStore) { private val listener = parent.progressListener def toNodeSeq: Seq[Node] = { @@ -123,9 +128,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{k}
{ - val logs = parent.executorsListener.executorToTaskSummary.get(k) - .map(_.executorLogs).getOrElse(Map.empty) - logs.map { + store.executorSummary(k).map(_.executorLogs).getOrElse(Map.empty).map { case (logName, logUrl) =>
{logName}
} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 3131c4a1eb7d..dd7e11fef1ba 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -321,11 +321,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { var content = summary val appStartTime = listener.startTime - val executorListener = parent.executorListener val operationGraphListener = parent.operationGraphListener content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, - executorListener.executorEvents, appStartTime) + parent.parent.store.executorEvents(), appStartTime) content ++= UIUtils.showDagVizForJob( jobId, operationGraphListener.getOperationGraphForJob(jobId)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 620c54c2dc0a..13b2ba11f6bc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -23,11 +23,10 @@ import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.{SparkUI, SparkUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { +private[ui] class JobsTab(val parent: SparkUI) extends SparkUITab(parent, "jobs") { val sc = parent.sc val killEnabled = parent.killEnabled val jobProgresslistener = parent.jobProgressListener - val executorListener = parent.executorsListener val operationGraphListener = parent.operationGraphListener def isFairScheduler: Boolean = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 19325a2dc916..db79dd90f165 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -29,18 +29,17 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.SparkConf import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} +import org.apache.spark.status.AppStateStore import org.apache.spark.ui._ -import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Distribution, Utils} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +private[ui] class StagePage(parent: StagesTab, store: AppStateStore) extends WebUIPage("stage") { import StagePage._ private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener - private val executorsListener = parent.executorsListener private val TIMELINE_LEGEND = {
@@ -302,7 +301,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { pageSize = taskPageSize, sortColumn = taskSortColumn, desc = taskSortDesc, - executorsListener = executorsListener + store = store ) (_taskTable, _taskTable.table(page)) } catch { @@ -561,7 +560,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stripeRowsWithCss = false)) } - val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) + val executorTable = new ExecutorTable(stageId, stageAttemptId, parent, store) val maybeAccumulableTable: Seq[Node] = if (hasAccumulators) {

Accumulators

++ accumulableTable } else Seq() @@ -865,7 +864,7 @@ private[ui] class TaskDataSource( pageSize: Int, sortColumn: String, desc: Boolean, - executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) { + store: AppStateStore) extends PagedDataSource[TaskTableRowData](pageSize) { import StagePage._ // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table @@ -1007,8 +1006,7 @@ private[ui] class TaskDataSource( None } - val logs = executorsListener.executorToTaskSummary.get(info.executorId) - .map(_.executorLogs).getOrElse(Map.empty) + val logs = store.executorSummary(info.executorId).map(_.executorLogs).getOrElse(Map.empty) new TaskTableRowData( info.index, info.taskId, @@ -1154,7 +1152,7 @@ private[ui] class TaskPagedTable( pageSize: Int, sortColumn: String, desc: Boolean, - executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] { + store: AppStateStore) extends PagedTable[TaskTableRowData] { override def tableId: String = "task-table" @@ -1179,7 +1177,7 @@ private[ui] class TaskPagedTable( pageSize, sortColumn, desc, - executorsListener) + store) override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 181465bdf960..5c8e1aba2ee2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -20,19 +20,21 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.status.AppStateStore import org.apache.spark.ui.{SparkUI, SparkUITab} /** Web UI showing progress status of all stages in the given SparkContext. */ -private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") { +private[ui] class StagesTab(val parent: SparkUI, store: AppStateStore) + extends SparkUITab(parent, "stages") { + val sc = parent.sc val conf = parent.conf val killEnabled = parent.killEnabled val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener - val executorsListener = parent.executorsListener attachPage(new AllStagesPage(this)) - attachPage(new StagePage(this)) + attachPage(new StagePage(this, store)) attachPage(new PoolPage(this)) def isFairScheduler: Boolean = progressListener.schedulingMode == Some(SchedulingMode.FAIR) diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json index 0f94e3b255db..0ad0ccca5c6e 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json @@ -1,6 +1,33 @@ [ { - "id" : "2", - "hostPort" : "172.22.0.167:51487", + "id" : "driver", + "hostPort" : "172.22.0.167:51475", + "isActive" : true, + "rddBlocks" : 0, + "memoryUsed" : 0, + "diskUsed" : 0, + "totalCores" : 0, + "maxTasks" : 0, + "activeTasks" : 0, + "failedTasks" : 0, + "completedTasks" : 0, + "totalTasks" : 0, + "totalDuration" : 0, + "totalGCTime" : 0, + "totalInputBytes" : 0, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 0, + "isBlacklisted" : true, + "maxMemory" : 908381388, + "executorLogs" : { }, + "memoryMetrics" : { + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 + } +}, { + "id" : "3", + "hostPort" : "172.22.0.167:51485", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, @@ -8,52 +35,55 @@ "totalCores" : 4, "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 4, - "completedTasks" : 0, - "totalTasks" : 4, - "totalDuration" : 2537, - "totalGCTime" : 88, + "failedTasks" : 0, + "completedTasks" : 12, + "totalTasks" : 12, + "totalDuration" : 2453, + "totalGCTime" : 72, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, "executorLogs" : { - "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", - "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", + "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } -}, { - "id" : "driver", - "hostPort" : "172.22.0.167:51475", +} ,{ + "id" : "2", + "hostPort" : "172.22.0.167:51487", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, - "totalCores" : 0, - "maxTasks" : 0, + "totalCores" : 4, + "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 0, + "failedTasks" : 4, "completedTasks" : 0, - "totalTasks" : 0, - "totalDuration" : 0, - "totalGCTime" : 0, + "totalTasks" : 4, + "totalDuration" : 2537, + "totalGCTime" : 88, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, - "executorLogs" : { }, + "executorLogs" : { + "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", + "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } }, { "id" : "1", @@ -110,39 +140,9 @@ "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr" }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 - } -}, { - "id" : "3", - "hostPort" : "172.22.0.167:51485", - "isActive" : true, - "rddBlocks" : 0, - "memoryUsed" : 0, - "diskUsed" : 0, - "totalCores" : 4, - "maxTasks" : 4, - "activeTasks" : 0, - "failedTasks" : 0, - "completedTasks" : 12, - "totalTasks" : 12, - "totalDuration" : 2453, - "totalGCTime" : 72, - "totalInputBytes" : 0, - "totalShuffleRead" : 0, - "totalShuffleWrite" : 0, - "isBlacklisted" : true, - "maxMemory" : 908381388, - "executorLogs" : { - "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", - "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" - }, - "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json index 0f94e3b255db..7727bef0178b 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json @@ -1,6 +1,33 @@ [ { - "id" : "2", - "hostPort" : "172.22.0.167:51487", + "id" : "driver", + "hostPort" : "172.22.0.167:51475", + "isActive" : true, + "rddBlocks" : 0, + "memoryUsed" : 0, + "diskUsed" : 0, + "totalCores" : 0, + "maxTasks" : 0, + "activeTasks" : 0, + "failedTasks" : 0, + "completedTasks" : 0, + "totalTasks" : 0, + "totalDuration" : 0, + "totalGCTime" : 0, + "totalInputBytes" : 0, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 0, + "isBlacklisted" : true, + "maxMemory" : 908381388, + "executorLogs" : { }, + "memoryMetrics": { + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 + } +}, { + "id" : "3", + "hostPort" : "172.22.0.167:51485", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, @@ -8,52 +35,55 @@ "totalCores" : 4, "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 4, - "completedTasks" : 0, - "totalTasks" : 4, - "totalDuration" : 2537, - "totalGCTime" : 88, + "failedTasks" : 0, + "completedTasks" : 12, + "totalTasks" : 12, + "totalDuration" : 2453, + "totalGCTime" : 72, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, "executorLogs" : { - "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", - "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", + "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } }, { - "id" : "driver", - "hostPort" : "172.22.0.167:51475", + "id" : "2", + "hostPort" : "172.22.0.167:51487", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, - "totalCores" : 0, - "maxTasks" : 0, + "totalCores" : 4, + "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 0, + "failedTasks" : 4, "completedTasks" : 0, - "totalTasks" : 0, - "totalDuration" : 0, - "totalGCTime" : 0, + "totalTasks" : 4, + "totalDuration" : 2537, + "totalGCTime" : 88, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, - "executorLogs" : { }, + "executorLogs" : { + "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", + "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } }, { "id" : "1", @@ -115,34 +145,4 @@ "totalOnHeapStorageMemory": 384093388, "totalOffHeapStorageMemory": 524288000 } -}, { - "id" : "3", - "hostPort" : "172.22.0.167:51485", - "isActive" : true, - "rddBlocks" : 0, - "memoryUsed" : 0, - "diskUsed" : 0, - "totalCores" : 4, - "maxTasks" : 4, - "activeTasks" : 0, - "failedTasks" : 0, - "completedTasks" : 12, - "totalTasks" : 12, - "totalDuration" : 2453, - "totalGCTime" : 72, - "totalInputBytes" : 0, - "totalShuffleRead" : 0, - "totalShuffleWrite" : 0, - "isBlacklisted" : true, - "maxMemory" : 908381388, - "executorLogs" : { - "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", - "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" - }, - "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 - } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json index 92e249c85111..4a8539a8558b 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json @@ -1,6 +1,27 @@ [ { - "id" : "2", - "hostPort" : "172.22.0.111:64539", + "id" : "driver", + "hostPort" : "172.22.0.111:64527", + "isActive" : true, + "rddBlocks" : 0, + "memoryUsed" : 0, + "diskUsed" : 0, + "totalCores" : 0, + "maxTasks" : 0, + "activeTasks" : 0, + "failedTasks" : 0, + "completedTasks" : 0, + "totalTasks" : 0, + "totalDuration" : 0, + "totalGCTime" : 0, + "totalInputBytes" : 0, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 0, + "isBlacklisted" : false, + "maxMemory" : 384093388, + "executorLogs" : { } +}, { + "id" : "3", + "hostPort" : "172.22.0.111:64543", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, @@ -8,41 +29,44 @@ "totalCores" : 4, "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 6, - "completedTasks" : 0, - "totalTasks" : 6, - "totalDuration" : 2792, - "totalGCTime" : 128, + "failedTasks" : 0, + "completedTasks" : 4, + "totalTasks" : 4, + "totalDuration" : 3457, + "totalGCTime" : 72, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, "executorLogs" : { - "stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout", - "stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr" + "stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout", + "stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr" } }, { - "id" : "driver", - "hostPort" : "172.22.0.111:64527", + "id" : "2", + "hostPort" : "172.22.0.111:64539", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, - "totalCores" : 0, - "maxTasks" : 0, + "totalCores" : 4, + "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 0, + "failedTasks" : 6, "completedTasks" : 0, - "totalTasks" : 0, - "totalDuration" : 0, - "totalGCTime" : 0, + "totalTasks" : 6, + "totalDuration" : 2792, + "totalGCTime" : 128, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, - "executorLogs" : { } + "executorLogs" : { + "stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout", + "stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr" + } }, { "id" : "1", "hostPort" : "172.22.0.111:64541", @@ -91,28 +115,4 @@ "stdout" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout", "stderr" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr" } -}, { - "id" : "3", - "hostPort" : "172.22.0.111:64543", - "isActive" : true, - "rddBlocks" : 0, - "memoryUsed" : 0, - "diskUsed" : 0, - "totalCores" : 4, - "maxTasks" : 4, - "activeTasks" : 0, - "failedTasks" : 0, - "completedTasks" : 4, - "totalTasks" : 4, - "totalDuration" : 3457, - "totalGCTime" : 72, - "totalInputBytes" : 0, - "totalShuffleRead" : 0, - "totalShuffleWrite" : 0, - "isBlacklisted" : false, - "maxMemory" : 384093388, - "executorLogs" : { - "stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout", - "stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr" - } } ] diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 499d47b13d70..4d8ef8ab97ef 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -22,13 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.mockito.Matchers.anyString import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ +import org.apache.spark.status.AppStateStore import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener @@ -55,20 +56,21 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { * This also runs a dummy stage to populate the page with useful content. */ private def renderStagePage(conf: SparkConf): Seq[Node] = { + val store = mock(classOf[AppStateStore]) + when(store.executorSummary(anyString())).thenReturn(None) + val jobListener = new JobProgressListener(conf) val graphListener = new RDDOperationGraphListener(conf) - val executorsListener = new ExecutorsListener(new StorageStatusListener(conf), conf) val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS) val request = mock(classOf[HttpServletRequest]) when(tab.conf).thenReturn(conf) when(tab.progressListener).thenReturn(jobListener) when(tab.operationGraphListener).thenReturn(graphListener) - when(tab.executorsListener).thenReturn(executorsListener) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) when(request.getParameter("id")).thenReturn("0") when(request.getParameter("attempt")).thenReturn("0") - val page = new StagePage(tab) + val page = new StagePage(tab, store) // Simulate a stage in job progress listener val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 724b5102b7b4..4bd6c3e206e8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -40,6 +40,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.env.EnvironmentListener"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.exec.ExecutorsListener"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") )