Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

/**
* Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming
* task events are not fired frequently.
*/
private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)

private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)

Expand All @@ -73,6 +79,9 @@ private[spark] class AppStatusListener(
// around liveExecutors.
@volatile private var activeExecutorCount = 0

/** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */
private var lastFlushTimeNs = System.nanoTime()

kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }

Expand All @@ -86,7 +95,8 @@ private[spark] class AppStatusListener(

kvstore.onFlush {
if (!live) {
flush()
val now = System.nanoTime()
flush(update(_, now))
}
}

Expand Down Expand Up @@ -744,6 +754,15 @@ private[spark] class AppStatusListener(
}
}
}

// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more than
// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
flush(maybeUpdate(_, now))
// Re-get the current system time because `flush` may be slow and `now` is stale.
lastFlushTimeNs = System.nanoTime()
}
}

override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
Expand All @@ -755,18 +774,17 @@ private[spark] class AppStatusListener(
}
}

/** Flush all live entities' data to the underlying store. */
private def flush(): Unit = {
val now = System.nanoTime()
/** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
liveStages.values.asScala.foreach { stage =>
update(stage, now)
stage.executorSummaries.values.foreach(update(_, now))
entityFlushFunc(stage)
stage.executorSummaries.values.foreach(entityFlushFunc)
}
liveJobs.values.foreach(update(_, now))
liveExecutors.values.foreach(update(_, now))
liveTasks.values.foreach(update(_, now))
liveRDDs.values.foreach(update(_, now))
pools.values.foreach(update(_, now))
liveJobs.values.foreach(entityFlushFunc)
liveExecutors.values.foreach(entityFlushFunc)
liveTasks.values.foreach(entityFlushFunc)
liveRDDs.values.foreach(entityFlushFunc)
pools.values.foreach(entityFlushFunc)
}

/**
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/status/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ private[spark] object config {
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("100ms")

val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD = ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod")
.doc("Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when " +
"incoming task events are not fired frequently.")
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("1s")

val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
.intConf
.createWithDefault(1000)
Expand Down
35 changes: 32 additions & 3 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.internal.config.{EXECUTOR_HEARTBEAT_INTERVAL, MEMORY_OFFHEAP_SIZE}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
import org.apache.spark.status.config._
Expand Down Expand Up @@ -99,14 +99,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
*/
private def newSparkContext(killEnabled: Boolean = true): SparkContext = {
private def newSparkContext(
killEnabled: Boolean = true,
master: String = "local",
additionalConfs: Map[String, String] = Map.empty): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setMaster(master)
.setAppName("test")
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.killEnabled", killEnabled.toString)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
Expand Down Expand Up @@ -724,6 +728,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}

test("Staleness of Spark UI should not last minutes or hours") {
withSpark(newSparkContext(
master = "local[2]",
// Set a small heart beat interval to make the test fast
additionalConfs = Map(
EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms",
LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
// Make the task never finish so there won't be any task start/end events after the first 2
// tasks start.
Thread.sleep(300000)
}
try {
eventually(timeout(10.seconds)) {
val jobsJson = getJson(sc.ui.get, "jobs")
jobsJson.children.length should be (1)
(jobsJson.children.head \ "numActiveTasks").extract[Int] should be (2)
}
} finally {
f.cancel()
}
}
}

def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,14 @@ Apart from these, the following properties are also available, and may be useful
operations that we can live without when rapidly processing incoming task events.
</td>
</tr>
<tr>
<td><code>spark.ui.liveUpdate.minFlushPeriod</code></td>
<td>1s</td>
<td>
Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming
task events are not fired frequently.
</td>
</tr>
<tr>
<td><code>spark.ui.port</code></td>
<td>4040</td>
Expand Down