Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.scheduler.cluster

import scala.annotation.meta.getter

import com.fasterxml.jackson.annotation.JsonIgnore

import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}

/**
Expand All @@ -29,9 +33,12 @@ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
* @param totalCores The total number of cores available to the executor
*/
private[cluster] class ExecutorData(
@(JsonIgnore @getter)
val executorEndpoint: RpcEndpointRef,
@(JsonIgnore @getter)
val executorAddress: RpcAddress,
override val executorHost: String,
@(JsonIgnore @getter)
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String]
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/status/AppStateListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[spark] class AppStateListener(

private var appInfo: v1.ApplicationInfo = null
private var coresPerTask: Int = 1
private var executorEventId: Long = 0L

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
Expand Down Expand Up @@ -100,6 +101,8 @@ private[spark] class AppStateListener(
details("System Properties"),
details("Classpath Entries"))

coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt)
.getOrElse(coresPerTask)
kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
}

Expand Down Expand Up @@ -135,19 +138,28 @@ private[spark] class AppStateListener(
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
exec.executorLogs = event.executorInfo.logUrlMap
liveUpdate(exec)

writeExecutorEvent(event)
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
exec.isActive = false
update(exec)
}

writeExecutorEvent(event)
}

override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
updateBlackListStatus(event.executorId, true)
}

private def writeExecutorEvent(event: SparkListenerEvent): Unit = {
executorEventId += 1
kvstore.write(new ExecutorEventData(executorEventId, event))
}

override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
updateBlackListStatus(event.executorId, false)
}
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/status/AppStateStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.kvstore.{InMemoryStore, KVStore}
import org.apache.spark.scheduler.SparkListenerBus
import org.apache.spark.scheduler.{SparkListenerBus, SparkListenerEvent}
import org.apache.spark.status.api.v1
import org.apache.spark.util.{Distribution, Utils}

Expand Down Expand Up @@ -56,6 +56,15 @@ private[spark] class AppStateStore(store: KVStore) {
.last(true).asScala.map(_.info).toSeq
}

def executorSummary(executorId: String): Option[v1.ExecutorSummary] = {
try {
Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info)
} catch {
case _: NoSuchElementException =>
None
}
}

def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
val it = store.view(classOf[StageDataWrapper]).sorted().asScala.map(_.info)
if (!statuses.isEmpty) {
Expand Down Expand Up @@ -198,6 +207,10 @@ private[spark] class AppStateStore(store: KVStore) {
store.read(classOf[RDDStorageInfoWrapper], rddId).info
}

def executorEvents(): Seq[SparkListenerEvent] = {
store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq
}

def close(): Unit = {
store.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,11 @@ import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.exec.ExecutorsPage

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllExecutorListResource(ui: SparkUI) {

@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
(0 until listener.activeStorageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
} ++ (0 until listener.deadStorageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, isActive = false)
}
}
}
def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,11 @@ import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.exec.ExecutorsPage

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ExecutorListResource(ui: SparkUI) {

@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.activeStorageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
}
}
def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true)

}
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.lang.{Integer => JInteger, Long => JLong}
import com.fasterxml.jackson.annotation.JsonIgnore

import org.apache.spark.kvstore.KVIndex
import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.status.api.v1._
import org.apache.spark.status.KVUtils._

Expand Down Expand Up @@ -132,3 +133,11 @@ private[spark] class ExecutorStageSummaryWrapper(
private[this] val stage: Array[Int] = Array(stageId, stageAttemptId)

}

/**
* Store raw executor events so that the executor timeline can be drawn. The event is wrapped
* in a container so that a monotonically increasing ID can be added to it.
*/
private[spark] class ExecutorEventData(
@KVIndexParam val id: Long,
val event: SparkListenerEvent)
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.status.api.v1._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
Expand All @@ -44,7 +44,6 @@ private[spark] class SparkUI private (
val conf: SparkConf,
securityManager: SecurityManager,
val storageStatusListener: StorageStatusListener,
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val operationGraphListener: RDDOperationGraphListener,
Expand All @@ -66,7 +65,7 @@ private[spark] class SparkUI private (
def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
val stagesTab = new StagesTab(this, store)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this, store))
Expand Down Expand Up @@ -181,17 +180,15 @@ private[spark] object SparkUI {
}

val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)

listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)

new SparkUI(store, sc, conf, securityManager, storageStatusListener, executorsListener,
jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime)
new SparkUI(store, sc, conf, securityManager, storageStatusListener, jobProgressListener,
storageListener, operationGraphListener, appName, basePath, startTime)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Text}

import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.SparkContext
import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}

private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {

private val sc = parent.sc
private[ui] class ExecutorThreadDumpPage(
parent: SparkUITab,
sc: Option[SparkContext]) extends WebUIPage("threadDump") {

def render(request: HttpServletRequest): Seq[Node] = {
val executorId = Option(request.getParameter("executorId")).map { executorId =>
Expand Down
154 changes: 0 additions & 154 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

This file was deleted.

Loading