Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf, listenerBus)
_statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l))

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
Expand All @@ -449,11 +449,7 @@ class SparkContext(config: SparkConf) extends Logging {

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf,
l => listenerBus.addToStatusQueue(l),
_env.securityManager,
appName,
"",
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,21 +316,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

val listener = if (needReplay) {
val _listener = new AppStatusListener(kvstore, conf, false)
val _listener = new AppStatusListener(kvstore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(_listener)
Some(_listener)
} else {
None
}

val loadedUI = {
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf,
l => replayBus.addListener(l),
secManager,
app.info.name,
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name,
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(),
appSparkVersion = attempt.info.appSparkVersion)
attempt.info.appSparkVersion)
LoadedAppUI(ui)
}

Expand Down
121 changes: 110 additions & 11 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@ import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
import org.apache.spark.util.kvstore.KVStore

/**
* A Spark listener that writes application information to a data store. The types written to the
* store are defined in the `storeTypes.scala` file and are based on the public REST API.
*
* @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: KVStore,
conf: SparkConf,
live: Boolean) extends SparkListener with Logging {
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {

import config._

Expand All @@ -50,13 +55,16 @@ private[spark] class AppStatusListener(
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)

// Keep track of live entities, so that task metrics can be efficiently updated (without
// causing too many writes to the underlying store, and other expensive operations).
private val liveStages = new HashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
Expand Down Expand Up @@ -210,16 +218,15 @@ private[spark] class AppStatusListener(
missingStages.map(_.numTasks).sum
}

val lastStageInfo = event.stageInfos.lastOption
val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")

val jobGroup = Option(event.properties)
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }

val job = new LiveJob(
event.jobId,
lastStageName,
Some(new Date(event.time)),
if (event.time > 0) Some(new Date(event.time)) else None,
event.stageIds,
jobGroup,
numTasks)
Expand All @@ -234,17 +241,51 @@ private[spark] class AppStatusListener(
stage.jobIds += event.jobId
liveUpdate(stage, now)
}

// Create the graph data for all the job's stages.
event.stageInfos.foreach { stage =>
val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes)
val uigraph = new RDDOperationGraphWrapper(
stage.stageId,
graph.edges,
graph.outgoingEdges,
graph.incomingEdges,
newRDDOperationCluster(graph.rootCluster))
kvstore.write(uigraph)
}
}

private def newRDDOperationCluster(cluster: RDDOperationCluster): RDDOperationClusterWrapper = {
new RDDOperationClusterWrapper(
cluster.id,
cluster.name,
cluster.childNodes,
cluster.childClusters.map(newRDDOperationCluster))
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = {
liveJobs.remove(event.jobId).foreach { job =>
val now = System.nanoTime()

// Check if there are any pending stages that match this job; mark those as skipped.
job.stageIds.foreach { sid =>
val pending = liveStages.filter { case ((id, _), _) => id == sid }
pending.foreach { case (key, stage) =>
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
liveStages.remove(key)
update(stage, now)
}
}

job.status = event.jobResult match {
case JobSucceeded => JobExecutionStatus.SUCCEEDED
case JobFailed(_) => JobExecutionStatus.FAILED
}

job.completionTime = Some(new Date(event.time))
update(job, System.nanoTime())
job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now)
}
}

Expand All @@ -262,12 +303,24 @@ private[spark] class AppStatusListener(
.toSeq
stage.jobIds = stage.jobs.map(_.jobId).toSet

stage.schedulingPool = Option(event.properties).flatMap { p =>
Option(p.getProperty("spark.scheduler.pool"))
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)

stage.description = Option(event.properties).flatMap { p =>
Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}

stage.jobs.foreach { job =>
job.completedStages = job.completedStages - event.stageInfo.stageId
job.activeStages += 1
liveUpdate(job, now)
}

val pool = pools.getOrElseUpdate(stage.schedulingPool, new SchedulerPool(stage.schedulingPool))
pool.stageIds = pool.stageIds + event.stageInfo.stageId
update(pool, now)

event.stageInfo.rddInfos.foreach { info =>
if (info.storageLevel.isValid) {
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now)
Expand All @@ -279,7 +332,7 @@ private[spark] class AppStatusListener(

override def onTaskStart(event: SparkListenerTaskStart): Unit = {
val now = System.nanoTime()
val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId)
val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId, lastUpdateTime)
liveTasks.put(event.taskInfo.taskId, task)
liveUpdate(task, now)

Expand Down Expand Up @@ -318,6 +371,8 @@ private[spark] class AppStatusListener(
val now = System.nanoTime()

val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task =>
task.info = event.taskInfo

val errorMessage = event.reason match {
case Success =>
None
Expand All @@ -337,11 +392,15 @@ private[spark] class AppStatusListener(
delta
}.orNull

val (completedDelta, failedDelta) = event.reason match {
val (completedDelta, failedDelta, killedDelta) = event.reason match {
case Success =>
(1, 0)
(1, 0, 0)
case _: TaskKilled =>
(0, 0, 1)
case _: TaskCommitDenied =>
(0, 0, 1)
case _ =>
(0, 1)
(0, 1, 0)
}

liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
Expand All @@ -350,20 +409,37 @@ private[spark] class AppStatusListener(
}
stage.activeTasks -= 1
stage.completedTasks += completedDelta
if (completedDelta > 0) {
stage.completedIndices.add(event.taskInfo.index)
}
stage.failedTasks += failedDelta
stage.killedTasks += killedDelta
if (killedDelta > 0) {
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
}
maybeUpdate(stage, now)

// Store both stage ID and task index in a single long variable for tracking at job level.
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
stage.jobs.foreach { job =>
job.activeTasks -= 1
job.completedTasks += completedDelta
if (completedDelta > 0) {
job.completedIndices.add(taskIndex)
}
job.failedTasks += failedDelta
job.killedTasks += killedDelta
if (killedDelta > 0) {
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
}
maybeUpdate(job, now)
}

val esummary = stage.executorSummary(event.taskInfo.executorId)
esummary.taskTime += event.taskInfo.duration
esummary.succeededTasks += completedDelta
esummary.failedTasks += failedDelta
esummary.killedTasks += killedDelta
if (metricsDelta != null) {
esummary.metrics.update(metricsDelta)
}
Expand Down Expand Up @@ -422,6 +498,11 @@ private[spark] class AppStatusListener(
liveUpdate(job, now)
}

pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds - event.stageInfo.stageId
update(pool, now)
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now)
}
Expand Down Expand Up @@ -482,11 +563,15 @@ private[spark] class AppStatusListener(
/** Flush all live entities' data to the underlying store. */
def flush(): Unit = {
val now = System.nanoTime()
liveStages.values.foreach(update(_, now))
liveStages.values.foreach { stage =>
update(stage, now)
stage.executorSummaries.values.foreach(update(_, now))
}
liveJobs.values.foreach(update(_, now))
liveExecutors.values.foreach(update(_, now))
liveTasks.values.foreach(update(_, now))
liveRDDs.values.foreach(update(_, now))
pools.values.foreach(update(_, now))
}

private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
Expand Down Expand Up @@ -628,6 +713,20 @@ private[spark] class AppStatusListener(
stage
}

private def killedTasksSummary(
reason: TaskEndReason,
oldSummary: Map[String, Int]): Map[String, Int] = {
reason match {
case k: TaskKilled =>
oldSummary.updated(k.reason, oldSummary.getOrElse(k.reason, 0) + 1)
case denied: TaskCommitDenied =>
val reason = denied.toErrorString
oldSummary.updated(reason, oldSummary.getOrElse(reason, 0) + 1)
case _ =>
oldSummary
}
}

private def update(entity: LiveEntity, now: Long): Unit = {
entity.write(kvstore, now)
}
Expand Down
Loading