Skip to content

Commit 1c70da3

Browse files
Marcelo Vanzincloud-fan
authored andcommitted
[SPARK-20657][CORE] Speed up rendering of the stages page.
There are two main changes to speed up rendering of the tasks list when rendering the stage page. The first one makes the code only load the tasks being shown in the current page of the tasks table, and information related to only those tasks. One side-effect of this change is that the graph that shows task-related events now only shows events for the tasks in the current page, instead of the previously hardcoded limit of "events for the first 1000 tasks". That ends up helping with readability, though. To make sorting efficient when using a disk store, the task wrapper was extended to include many new indices, one for each of the sortable columns in the UI, and metrics for which quantiles are calculated. The second changes the way metric quantiles are calculated for stages. Instead of using the "Distribution" class to process data for all task metrics, which requires scanning all tasks of a stage, the code now uses the KVStore "skip()" functionality to only read tasks that contain interesting information for the quantiles that are desired. This is still not cheap; because there are many metrics that the UI and API track, the code needs to scan the index for each metric to gather the information. Savings come mainly from skipping deserialization when using the disk store, but the in-memory code also seems to be faster than before (most probably because of other changes in this patch). To make subsequent calls faster, some quantiles are cached in the status store. This makes UIs much faster after the first time a stage has been loaded. With the above changes, a lot of code in the UI layer could be simplified. Author: Marcelo Vanzin <[email protected]> Closes #20013 from vanzin/SPARK-20657.
1 parent 87c98de commit 1c70da3

File tree

18 files changed

+1361
-986
lines changed

18 files changed

+1361
-986
lines changed

common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
8383
if (versionData != null) {
8484
long version = serializer.deserializeLong(versionData);
8585
if (version != STORE_VERSION) {
86+
close();
8687
throw new UnsupportedStoreVersionException();
8788
}
8889
} else {

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,10 @@ private[spark] class AppStatusListener(
377377
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
378378
stage.activeTasks += 1
379379
stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)
380+
381+
val locality = event.taskInfo.taskLocality.toString()
382+
val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
383+
stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
380384
maybeUpdate(stage, now)
381385

382386
stage.jobs.foreach { job =>
@@ -433,7 +437,7 @@ private[spark] class AppStatusListener(
433437
}
434438
task.errorMessage = errorMessage
435439
val delta = task.updateMetrics(event.taskMetrics)
436-
update(task, now)
440+
update(task, now, last = true)
437441
delta
438442
}.orNull
439443

@@ -450,7 +454,7 @@ private[spark] class AppStatusListener(
450454

451455
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
452456
if (metricsDelta != null) {
453-
stage.metrics.update(metricsDelta)
457+
stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta)
454458
}
455459
stage.activeTasks -= 1
456460
stage.completedTasks += completedDelta
@@ -486,7 +490,7 @@ private[spark] class AppStatusListener(
486490
esummary.failedTasks += failedDelta
487491
esummary.killedTasks += killedDelta
488492
if (metricsDelta != null) {
489-
esummary.metrics.update(metricsDelta)
493+
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
490494
}
491495
maybeUpdate(esummary, now)
492496

@@ -604,11 +608,11 @@ private[spark] class AppStatusListener(
604608
maybeUpdate(task, now)
605609

606610
Option(liveStages.get((sid, sAttempt))).foreach { stage =>
607-
stage.metrics.update(delta)
611+
stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta)
608612
maybeUpdate(stage, now)
609613

610614
val esummary = stage.executorSummary(event.execId)
611-
esummary.metrics.update(delta)
615+
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta)
612616
maybeUpdate(esummary, now)
613617
}
614618
}
@@ -690,7 +694,7 @@ private[spark] class AppStatusListener(
690694
// can update the executor information too.
691695
liveRDDs.get(block.rddId).foreach { rdd =>
692696
if (updatedStorageLevel.isDefined) {
693-
rdd.storageLevel = updatedStorageLevel.get
697+
rdd.setStorageLevel(updatedStorageLevel.get)
694698
}
695699

696700
val partition = rdd.partition(block.name)
@@ -814,7 +818,7 @@ private[spark] class AppStatusListener(
814818

815819
/** Update a live entity only if it hasn't been updated in the last configured period. */
816820
private def maybeUpdate(entity: LiveEntity, now: Long): Unit = {
817-
if (liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
821+
if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
818822
update(entity, now)
819823
}
820824
}
@@ -865,7 +869,7 @@ private[spark] class AppStatusListener(
865869
}
866870

867871
stages.foreach { s =>
868-
val key = s.id
872+
val key = Array(s.info.stageId, s.info.attemptId)
869873
kvstore.delete(s.getClass(), key)
870874

871875
val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
@@ -885,15 +889,15 @@ private[spark] class AppStatusListener(
885889
.asScala
886890

887891
tasks.foreach { t =>
888-
kvstore.delete(t.getClass(), t.info.taskId)
892+
kvstore.delete(t.getClass(), t.taskId)
889893
}
890894

891895
// Check whether there are remaining attempts for the same stage. If there aren't, then
892896
// also delete the RDD graph data.
893897
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
894898
.index("stageId")
895-
.first(s.stageId)
896-
.last(s.stageId)
899+
.first(s.info.stageId)
900+
.last(s.info.stageId)
897901
.closeableIterator()
898902

899903
val hasMoreAttempts = try {
@@ -905,8 +909,10 @@ private[spark] class AppStatusListener(
905909
}
906910

907911
if (!hasMoreAttempts) {
908-
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
912+
kvstore.delete(classOf[RDDOperationGraphWrapper], s.info.stageId)
909913
}
914+
915+
cleanupCachedQuantiles(key)
910916
}
911917
}
912918

@@ -919,9 +925,9 @@ private[spark] class AppStatusListener(
919925

920926
// Try to delete finished tasks only.
921927
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
922-
!live || t.info.status != TaskState.RUNNING.toString()
928+
!live || t.status != TaskState.RUNNING.toString()
923929
}
924-
toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
930+
toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
925931
stage.savedTasks.addAndGet(-toDelete.size)
926932

927933
// If there are more running tasks than the configured limit, delete running tasks. This
@@ -930,13 +936,34 @@ private[spark] class AppStatusListener(
930936
val remaining = countToDelete - toDelete.size
931937
if (remaining > 0) {
932938
val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
933-
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
939+
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
934940
stage.savedTasks.addAndGet(-remaining)
935941
}
942+
943+
// On live applications, cleanup any cached quantiles for the stage. This makes sure that
944+
// quantiles will be recalculated after tasks are replaced with newer ones.
945+
//
946+
// This is not needed in the SHS since caching only happens after the event logs are
947+
// completely processed.
948+
if (live) {
949+
cleanupCachedQuantiles(stageKey)
950+
}
936951
}
937952
stage.cleaning = false
938953
}
939954

955+
private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = {
956+
val cachedQuantiles = kvstore.view(classOf[CachedQuantile])
957+
.index("stage")
958+
.first(stageKey)
959+
.last(stageKey)
960+
.asScala
961+
.toList
962+
cachedQuantiles.foreach { q =>
963+
kvstore.delete(q.getClass(), q.id)
964+
}
965+
}
966+
940967
/**
941968
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
942969
* asynchronously, this method may return 0 in case enough items have been deleted already.

0 commit comments

Comments
 (0)