@@ -19,6 +19,7 @@ package org.apache.spark.status
1919
2020import java .util .Date
2121
22+ import scala .collection .JavaConverters ._
2223import scala .collection .mutable .HashMap
2324
2425import org .apache .spark ._
@@ -29,7 +30,6 @@ import org.apache.spark.status.api.v1
2930import org .apache .spark .storage ._
3031import org .apache .spark .ui .SparkUI
3132import org .apache .spark .ui .scope ._
32- import org .apache .spark .util .kvstore .KVStore
3333
3434/**
3535 * A Spark listener that writes application information to a data store. The types written to the
@@ -39,7 +39,7 @@ import org.apache.spark.util.kvstore.KVStore
3939 * unfinished tasks can be more accurately calculated (see SPARK-21922).
4040 */
4141private [spark] class AppStatusListener (
42- kvstore : KVStore ,
42+ kvstore : ElementTrackingStore ,
4343 conf : SparkConf ,
4444 live : Boolean ,
4545 lastUpdateTime : Option [Long ] = None ) extends SparkListener with Logging {
@@ -48,13 +48,15 @@ private[spark] class AppStatusListener(
4848
4949 private var sparkVersion = SPARK_VERSION
5050 private var appInfo : v1.ApplicationInfo = null
51+ private var appSummary = new AppSummary (0 , 0 )
5152 private var coresPerTask : Int = 1
5253
5354 // How often to update live entities. -1 means "never update" when replaying applications,
5455 // meaning only the last write will happen. For live applications, this avoids a few
5556 // operations that we can live without when rapidly processing incoming task events.
5657 private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD ) else - 1L
5758
59+ private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE )
5860 private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES )
5961
6062 // Keep track of live entities, so that task metrics can be efficiently updated (without
@@ -65,10 +67,25 @@ private[spark] class AppStatusListener(
6567 private val liveTasks = new HashMap [Long , LiveTask ]()
6668 private val liveRDDs = new HashMap [Int , LiveRDD ]()
6769 private val pools = new HashMap [String , SchedulerPool ]()
70+ // Keep the active executor count as a separate variable to avoid having to do synchronization
71+ // around liveExecutors.
72+ @ volatile private var activeExecutorCount = 0
6873
69- override def onOtherEvent (event : SparkListenerEvent ): Unit = event match {
70- case SparkListenerLogStart (version) => sparkVersion = version
71- case _ =>
74+ kvstore.addTrigger(classOf [ExecutorSummaryWrapper ], conf.get(MAX_RETAINED_DEAD_EXECUTORS ))
75+ { count => cleanupExecutors(count) }
76+
77+ kvstore.addTrigger(classOf [JobDataWrapper ], conf.get(MAX_RETAINED_JOBS )) { count =>
78+ cleanupJobs(count)
79+ }
80+
81+ kvstore.addTrigger(classOf [StageDataWrapper ], conf.get(MAX_RETAINED_STAGES )) { count =>
82+ cleanupStages(count)
83+ }
84+
85+ kvstore.onFlush {
86+ if (! live) {
87+ flush()
88+ }
7289 }
7390
7491 override def onApplicationStart (event : SparkListenerApplicationStart ): Unit = {
@@ -94,6 +111,7 @@ private[spark] class AppStatusListener(
94111 Seq (attempt))
95112
96113 kvstore.write(new ApplicationInfoWrapper (appInfo))
114+ kvstore.write(appSummary)
97115 }
98116
99117 override def onEnvironmentUpdate (event : SparkListenerEnvironmentUpdate ): Unit = {
@@ -155,10 +173,11 @@ private[spark] class AppStatusListener(
155173 override def onExecutorRemoved (event : SparkListenerExecutorRemoved ): Unit = {
156174 liveExecutors.remove(event.executorId).foreach { exec =>
157175 val now = System .nanoTime()
176+ activeExecutorCount = math.max(0 , activeExecutorCount - 1 )
158177 exec.isActive = false
159178 exec.removeTime = new Date (event.time)
160179 exec.removeReason = event.reason
161- update(exec, now)
180+ update(exec, now, last = true )
162181
163182 // Remove all RDD distributions that reference the removed executor, in case there wasn't
164183 // a corresponding event.
@@ -285,8 +304,11 @@ private[spark] class AppStatusListener(
285304 }
286305
287306 job.completionTime = if (event.time > 0 ) Some (new Date (event.time)) else None
288- update(job, now)
307+ update(job, now, last = true )
289308 }
309+
310+ appSummary = new AppSummary (appSummary.numCompletedJobs + 1 , appSummary.numCompletedStages)
311+ kvstore.write(appSummary)
290312 }
291313
292314 override def onStageSubmitted (event : SparkListenerStageSubmitted ): Unit = {
@@ -345,6 +367,13 @@ private[spark] class AppStatusListener(
345367 job.activeTasks += 1
346368 maybeUpdate(job, now)
347369 }
370+
371+ if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && ! stage.cleaning) {
372+ stage.cleaning = true
373+ kvstore.doAsync {
374+ cleanupTasks(stage)
375+ }
376+ }
348377 }
349378
350379 liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -444,6 +473,13 @@ private[spark] class AppStatusListener(
444473 esummary.metrics.update(metricsDelta)
445474 }
446475 maybeUpdate(esummary, now)
476+
477+ if (! stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
478+ stage.cleaning = true
479+ kvstore.doAsync {
480+ cleanupTasks(stage)
481+ }
482+ }
447483 }
448484
449485 liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -504,8 +540,11 @@ private[spark] class AppStatusListener(
504540 }
505541
506542 stage.executorSummaries.values.foreach(update(_, now))
507- update(stage, now)
543+ update(stage, now, last = true )
508544 }
545+
546+ appSummary = new AppSummary (appSummary.numCompletedJobs, appSummary.numCompletedStages + 1 )
547+ kvstore.write(appSummary)
509548 }
510549
511550 override def onBlockManagerAdded (event : SparkListenerBlockManagerAdded ): Unit = {
@@ -561,7 +600,7 @@ private[spark] class AppStatusListener(
561600 }
562601
563602 /** Flush all live entities' data to the underlying store. */
564- def flush (): Unit = {
603+ private def flush (): Unit = {
565604 val now = System .nanoTime()
566605 liveStages.values.foreach { stage =>
567606 update(stage, now)
@@ -684,7 +723,10 @@ private[spark] class AppStatusListener(
684723 }
685724
686725 private def getOrCreateExecutor (executorId : String , addTime : Long ): LiveExecutor = {
687- liveExecutors.getOrElseUpdate(executorId, new LiveExecutor (executorId, addTime))
726+ liveExecutors.getOrElseUpdate(executorId, {
727+ activeExecutorCount += 1
728+ new LiveExecutor (executorId, addTime)
729+ })
688730 }
689731
690732 private def updateStreamBlock (event : SparkListenerBlockUpdated , stream : StreamBlockId ): Unit = {
@@ -727,8 +769,8 @@ private[spark] class AppStatusListener(
727769 }
728770 }
729771
730- private def update (entity : LiveEntity , now : Long ): Unit = {
731- entity.write(kvstore, now)
772+ private def update (entity : LiveEntity , now : Long , last : Boolean = false ): Unit = {
773+ entity.write(kvstore, now, checkTriggers = last )
732774 }
733775
734776 /** Update a live entity only if it hasn't been updated in the last configured period. */
@@ -745,4 +787,118 @@ private[spark] class AppStatusListener(
745787 }
746788 }
747789
790+ private def cleanupExecutors (count : Long ): Unit = {
791+ // Because the limit is on the number of *dead* executors, we need to calculate whether
792+ // there are actually enough dead executors to be deleted.
793+ val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS )
794+ val dead = count - activeExecutorCount
795+
796+ if (dead > threshold) {
797+ val countToDelete = calculateNumberToRemove(dead, threshold)
798+ val toDelete = kvstore.view(classOf [ExecutorSummaryWrapper ]).index(" active" )
799+ .max(countToDelete).first(false ).last(false ).asScala.toSeq
800+ toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
801+ }
802+ }
803+
804+ private def cleanupJobs (count : Long ): Unit = {
805+ val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS ))
806+ if (countToDelete <= 0L ) {
807+ return
808+ }
809+
810+ val toDelete = KVUtils .viewToSeq(kvstore.view(classOf [JobDataWrapper ]),
811+ countToDelete.toInt) { j =>
812+ j.info.status != JobExecutionStatus .RUNNING && j.info.status != JobExecutionStatus .UNKNOWN
813+ }
814+ toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
815+ }
816+
817+ private def cleanupStages (count : Long ): Unit = {
818+ val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES ))
819+ if (countToDelete <= 0L ) {
820+ return
821+ }
822+
823+ val stages = KVUtils .viewToSeq(kvstore.view(classOf [StageDataWrapper ]),
824+ countToDelete.toInt) { s =>
825+ s.info.status != v1.StageStatus .ACTIVE && s.info.status != v1.StageStatus .PENDING
826+ }
827+
828+ stages.foreach { s =>
829+ val key = s.id
830+ kvstore.delete(s.getClass(), key)
831+
832+ val execSummaries = kvstore.view(classOf [ExecutorStageSummaryWrapper ])
833+ .index(" stage" )
834+ .first(key)
835+ .last(key)
836+ .asScala
837+ .toSeq
838+ execSummaries.foreach { e =>
839+ kvstore.delete(e.getClass(), e.id)
840+ }
841+
842+ val tasks = kvstore.view(classOf [TaskDataWrapper ])
843+ .index(" stage" )
844+ .first(key)
845+ .last(key)
846+ .asScala
847+
848+ tasks.foreach { t =>
849+ kvstore.delete(t.getClass(), t.info.taskId)
850+ }
851+
852+ // Check whether there are remaining attempts for the same stage. If there aren't, then
853+ // also delete the RDD graph data.
854+ val remainingAttempts = kvstore.view(classOf [StageDataWrapper ])
855+ .index(" stageId" )
856+ .first(s.stageId)
857+ .last(s.stageId)
858+ .closeableIterator()
859+
860+ val hasMoreAttempts = try {
861+ remainingAttempts.asScala.exists { other =>
862+ other.info.attemptId != s.info.attemptId
863+ }
864+ } finally {
865+ remainingAttempts.close()
866+ }
867+
868+ if (! hasMoreAttempts) {
869+ kvstore.delete(classOf [RDDOperationGraphWrapper ], s.stageId)
870+ }
871+ }
872+ }
873+
874+ private def cleanupTasks (stage : LiveStage ): Unit = {
875+ val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage)
876+ if (countToDelete > 0L ) {
877+ val stageKey = Array (stage.info.stageId, stage.info.attemptId)
878+ val view = kvstore.view(classOf [TaskDataWrapper ]).index(" stage" ).first(stageKey)
879+ .last(stageKey)
880+
881+ // On live applications, try to delete finished tasks only; when in the SHS, treat all
882+ // tasks as the same.
883+ val toDelete = KVUtils .viewToSeq(view, countToDelete.toInt) { t =>
884+ ! live || t.info.status != TaskState .RUNNING .toString()
885+ }
886+ toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
887+ stage.savedTasks.addAndGet(- toDelete.size)
888+ }
889+ stage.cleaning = false
890+ }
891+
892+ /**
893+ * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
894+ * asynchronously, this method may return 0 in case enough items have been deleted already.
895+ */
896+ private def calculateNumberToRemove (dataSize : Long , retainedSize : Long ): Long = {
897+ if (dataSize > retainedSize) {
898+ math.max(retainedSize / 10L , dataSize - retainedSize)
899+ } else {
900+ 0L
901+ }
902+ }
903+
748904}
0 commit comments