Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
if (versionData != null) {
long version = serializer.deserializeLong(versionData);
if (version != STORE_VERSION) {
close();
throw new UnsupportedStoreVersionException();
}
} else {
Expand Down
57 changes: 42 additions & 15 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ private[spark] class AppStatusListener(
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
stage.activeTasks += 1
stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)

val locality = event.taskInfo.taskLocality.toString()
val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
maybeUpdate(stage, now)

stage.jobs.foreach { job =>
Expand Down Expand Up @@ -433,7 +437,7 @@ private[spark] class AppStatusListener(
}
task.errorMessage = errorMessage
val delta = task.updateMetrics(event.taskMetrics)
update(task, now)
update(task, now, last = true)
delta
}.orNull

Expand All @@ -450,7 +454,7 @@ private[spark] class AppStatusListener(

Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
if (metricsDelta != null) {
stage.metrics.update(metricsDelta)
stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta)
}
stage.activeTasks -= 1
stage.completedTasks += completedDelta
Expand Down Expand Up @@ -486,7 +490,7 @@ private[spark] class AppStatusListener(
esummary.failedTasks += failedDelta
esummary.killedTasks += killedDelta
if (metricsDelta != null) {
esummary.metrics.update(metricsDelta)
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
}
maybeUpdate(esummary, now)

Expand Down Expand Up @@ -604,11 +608,11 @@ private[spark] class AppStatusListener(
maybeUpdate(task, now)

Option(liveStages.get((sid, sAttempt))).foreach { stage =>
stage.metrics.update(delta)
stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta)
maybeUpdate(stage, now)

val esummary = stage.executorSummary(event.execId)
esummary.metrics.update(delta)
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta)
maybeUpdate(esummary, now)
}
}
Expand Down Expand Up @@ -690,7 +694,7 @@ private[spark] class AppStatusListener(
// can update the executor information too.
liveRDDs.get(block.rddId).foreach { rdd =>
if (updatedStorageLevel.isDefined) {
rdd.storageLevel = updatedStorageLevel.get
rdd.setStorageLevel(updatedStorageLevel.get)
}

val partition = rdd.partition(block.name)
Expand Down Expand Up @@ -814,7 +818,7 @@ private[spark] class AppStatusListener(

/** Update a live entity only if it hasn't been updated in the last configured period. */
private def maybeUpdate(entity: LiveEntity, now: Long): Unit = {
if (liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
update(entity, now)
}
}
Expand Down Expand Up @@ -865,7 +869,7 @@ private[spark] class AppStatusListener(
}

stages.foreach { s =>
val key = s.id
val key = Array(s.info.stageId, s.info.attemptId)
kvstore.delete(s.getClass(), key)

val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
Expand All @@ -885,15 +889,15 @@ private[spark] class AppStatusListener(
.asScala

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.info.taskId)
kvstore.delete(t.getClass(), t.taskId)
}

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
.index("stageId")
.first(s.stageId)
.last(s.stageId)
.first(s.info.stageId)
.last(s.info.stageId)
.closeableIterator()

val hasMoreAttempts = try {
Expand All @@ -905,8 +909,10 @@ private[spark] class AppStatusListener(
}

if (!hasMoreAttempts) {
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
kvstore.delete(classOf[RDDOperationGraphWrapper], s.info.stageId)
}

cleanupCachedQuantiles(key)
}
}

Expand All @@ -919,9 +925,9 @@ private[spark] class AppStatusListener(

// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
!live || t.info.status != TaskState.RUNNING.toString()
!live || t.status != TaskState.RUNNING.toString()
}
toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
stage.savedTasks.addAndGet(-toDelete.size)

// If there are more running tasks than the configured limit, delete running tasks. This
Expand All @@ -930,13 +936,34 @@ private[spark] class AppStatusListener(
val remaining = countToDelete - toDelete.size
if (remaining > 0) {
val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
stage.savedTasks.addAndGet(-remaining)
}

// On live applications, cleanup any cached quantiles for the stage. This makes sure that
// quantiles will be recalculated after tasks are replaced with newer ones.
//
// This is not needed in the SHS since caching only happens after the event logs are
// completely processed.
if (live) {
cleanupCachedQuantiles(stageKey)
}
}
stage.cleaning = false
}

private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = {
val cachedQuantiles = kvstore.view(classOf[CachedQuantile])
.index("stage")
.first(stageKey)
.last(stageKey)
.asScala
.toList
cachedQuantiles.foreach { q =>
kvstore.delete(q.getClass(), q.id)
}
}

/**
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
Expand Down
Loading