Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
Expand Down Expand Up @@ -314,21 +315,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

if (needReplay) {
// Disable async updates, since they cause higher memory usage, and it doesn't matter much if
// the listener bus is backed up when replaying logs.
val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
val trackingStore = new ElementTrackingStore(kvstore, replayConf)
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
val listener = new AppStatusListener(trackingStore, replayConf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
plugin.setupListeners(replayConf, trackingStore, l => replayBus.addListener(l), false)
}
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.flush()
trackingStore.close(false)
} catch {
case e: Exception =>
try {
kvstore.close()
trackingStore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ package object config {
.stringConf
.createOptional

// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)

// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
Expand Down
179 changes: 167 additions & 12 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
import org.apache.spark.util.kvstore.KVStore

/**
* A Spark listener that writes application information to a data store. The types written to the
Expand All @@ -42,7 +41,7 @@ import org.apache.spark.util.kvstore.KVStore
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: KVStore,
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
Expand All @@ -51,13 +50,15 @@ private[spark] class AppStatusListener(

private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)

// Keep track of live entities, so that task metrics can be efficiently updated (without
Expand All @@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
// Keep the active executor count as a separate variable to avoid having to do synchronization
// around liveExecutors.
@volatile private var activeExecutorCount = 0

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case _ =>
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }

kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
cleanupJobs(count)
}

kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
cleanupStages(count)
}

kvstore.onFlush {
if (!live) {
flush()
}
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
Expand All @@ -97,6 +113,7 @@ private[spark] class AppStatusListener(
Seq(attempt))

kvstore.write(new ApplicationInfoWrapper(appInfo))
kvstore.write(appSummary)
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
Expand Down Expand Up @@ -157,10 +174,11 @@ private[spark] class AppStatusListener(
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
val now = System.nanoTime()
activeExecutorCount = math.max(0, activeExecutorCount - 1)
exec.isActive = false
exec.removeTime = new Date(event.time)
exec.removeReason = event.reason
update(exec, now)
update(exec, now, last = true)

// Remove all RDD distributions that reference the removed executor, in case there wasn't
// a corresponding event.
Expand Down Expand Up @@ -289,8 +307,11 @@ private[spark] class AppStatusListener(
}

job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now)
update(job, now, last = true)
}

appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
kvstore.write(appSummary)
}

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
Expand Down Expand Up @@ -349,6 +370,13 @@ private[spark] class AppStatusListener(
job.activeTasks += 1
maybeUpdate(job, now)
}

if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand Down Expand Up @@ -448,6 +476,13 @@ private[spark] class AppStatusListener(
esummary.metrics.update(metricsDelta)
}
maybeUpdate(esummary, now)

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand Down Expand Up @@ -509,8 +544,11 @@ private[spark] class AppStatusListener(
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now)
update(stage, now, last = true)
}

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}

override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
Expand Down Expand Up @@ -566,7 +604,7 @@ private[spark] class AppStatusListener(
}

/** Flush all live entities' data to the underlying store. */
def flush(): Unit = {
private def flush(): Unit = {
val now = System.nanoTime()
liveStages.values.asScala.foreach { stage =>
update(stage, now)
Expand Down Expand Up @@ -720,7 +758,10 @@ private[spark] class AppStatusListener(
}

private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime))
liveExecutors.getOrElseUpdate(executorId, {
activeExecutorCount += 1
new LiveExecutor(executorId, addTime)
})
}

private def getOrCreateStage(info: StageInfo): LiveStage = {
Expand All @@ -746,8 +787,8 @@ private[spark] class AppStatusListener(
}
}

private def update(entity: LiveEntity, now: Long): Unit = {
entity.write(kvstore, now)
private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = {
entity.write(kvstore, now, checkTriggers = last)
}

/** Update a live entity only if it hasn't been updated in the last configured period. */
Expand All @@ -764,4 +805,118 @@ private[spark] class AppStatusListener(
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
val dead = count - activeExecutorCount

if (dead > threshold) {
val countToDelete = calculateNumberToRemove(dead, threshold)
val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
.max(countToDelete).first(false).last(false).asScala.toSeq
toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
}
}

private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
return
}

val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
countToDelete.toInt) { j =>
j.info.status == JobExecutionStatus.SUCCEEDED || j.info.status == JobExecutionStatus.FAILED
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}

private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
return
}

val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

stages.foreach { s =>
val key = s.id
kvstore.delete(s.getClass(), key)

val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
.index("stage")
.first(key)
.last(key)
.asScala
.toSeq
execSummaries.foreach { e =>
kvstore.delete(e.getClass(), e.id)
}

val tasks = kvstore.view(classOf[TaskDataWrapper])
.index("stage")
.first(key)
.last(key)
.asScala

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.info.taskId)
}

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
.index("stageId")
.first(s.stageId)
.last(s.stageId)
.closeableIterator()

val hasMoreAttempts = try {
remainingAttempts.asScala.exists { other =>
other.info.attemptId != s.info.attemptId
}
} finally {
remainingAttempts.close()
}

if (!hasMoreAttempts) {
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
}
}
}

private def cleanupTasks(stage: LiveStage): Unit = {
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage)
if (countToDelete > 0L) {
val stageKey = Array(stage.info.stageId, stage.info.attemptId)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)

// On live applications, try to delete finished tasks only; when in the SHS, treat all
// tasks as the same.
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t =>
!live || t.info.status != TaskState.RUNNING.toString()
}
toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-toDelete.size)
}
stage.cleaning = false
}

/**
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
*/
private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
if (dataSize > retainedSize) {
math.max(retainedSize / 10L, dataSize - retainedSize)
} else {
0L
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] trait AppStatusPlugin {
*/
def setupListeners(
conf: SparkConf,
store: KVStore,
store: ElementTrackingStore,
addListenerFn: SparkListener => Unit,
live: Boolean): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ private[spark] class AppStatusStore(
store.read(classOf[PoolData], name)
}

def appSummary(): AppSummary = {
store.read(classOf[AppSummary], classOf[AppSummary].getName())
}

def close(): Unit = {
store.close()
}
Expand All @@ -340,7 +344,7 @@ private[spark] object AppStatusStore {
* @param addListenerFn Function to register a listener with a bus.
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore()
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach(_.setupListeners(conf, store, addListenerFn, true))
Expand Down
Loading