Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ private[spark] object Status {
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("100ms")

val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD = ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod")
.doc("Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when " +
"incoming task events are not fired frequently.")
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("1s")

val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
.intConf
.createWithDefault(1000)
Expand Down
39 changes: 28 additions & 11 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

/**
* Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming
* task events are not fired frequently.
*/
private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)

private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)

Expand All @@ -76,6 +82,9 @@ private[spark] class AppStatusListener(
// around liveExecutors.
@volatile private var activeExecutorCount = 0

/** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */
private var lastFlushTimeNs = System.nanoTime()

kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }

Expand All @@ -89,7 +98,8 @@ private[spark] class AppStatusListener(

kvstore.onFlush {
if (!live) {
flush()
val now = System.nanoTime()
flush(update(_, now))
}
}

Expand Down Expand Up @@ -831,6 +841,14 @@ private[spark] class AppStatusListener(
}
}
}
// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more than
// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
flush(maybeUpdate(_, now))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... in the bug you mention that job-level data is not being updated. Is that the only case? Because if that's it, then this looks like overkill. You could e.g. update the jobs in the code that handles event.accumUpdates above, or even just flush jobs specifically, instead of everything.

Doing a full flush here seems like overkill and a little expensive when you think about many heartbeats arriving in a short period (even when considering lastFlushTimeNs).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... in the bug you mention that job-level data is not being updated. Is that the only case?

I also noticed that executor active tasks sometimes could be wrong. That's why I decided to flush everything to make sure we don't miss any places. It's also hard to maintain if we need to manually flush in every place.

Ideally, we should flush periodically so that it doesn't depend on receiving a Spark event. But then I will need to add a new event type and post to the listener bus. That's overkilled.

when you think about many heartbeats arriving in a short period

At least there will be at least 100ms between each flush. As long as we process heart beats very fast, most of them won't trigger the flush.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the goal is to use the hearbeats as some trigger for flushing, how about using some ratio of the heartbeat period instead of liveUpdatePeriodNs to control whether to flush everything?

Really large apps can get a little backed up when processing hearbeats from lots and lots of busy executors, and this would make it a little worse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update only happens in live UI, which should be fine in general. For real large apps, will it help by setting LIVE_ENTITY_UPDATE_PERIOD to a larger value? Setting a ratio of heartbeat period seems a bit complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only happens in live UI

The "don't write to the store all the time" thing was added specifically to speed up live UIs, because copying + writing the data (even to the memory store) becomes really expensive when you have event storms (think thousands of tasks starting and stopping in a very short period).

setting LIVE_ENTITY_UPDATE_PERIOD to a larger value

We should avoid requiring configuration tweaks for things not to break, when possible.

// Re-get the current system time because `flush` may be slow and `now` is stale.
lastFlushTimeNs = System.nanoTime()
}
}

override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
Expand All @@ -856,18 +874,17 @@ private[spark] class AppStatusListener(
}
}

/** Flush all live entities' data to the underlying store. */
private def flush(): Unit = {
val now = System.nanoTime()
/** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
liveStages.values.asScala.foreach { stage =>
update(stage, now)
stage.executorSummaries.values.foreach(update(_, now))
entityFlushFunc(stage)
stage.executorSummaries.values.foreach(entityFlushFunc)
}
liveJobs.values.foreach(update(_, now))
liveExecutors.values.foreach(update(_, now))
liveTasks.values.foreach(update(_, now))
liveRDDs.values.foreach(update(_, now))
pools.values.foreach(update(_, now))
liveJobs.values.foreach(entityFlushFunc)
liveExecutors.values.foreach(entityFlushFunc)
liveTasks.values.foreach(entityFlushFunc)
liveRDDs.values.foreach(entityFlushFunc)
pools.values.foreach(entityFlushFunc)
}

/**
Expand Down
33 changes: 31 additions & 2 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
*/
private def newSparkContext(killEnabled: Boolean = true): SparkContext = {
private def newSparkContext(
killEnabled: Boolean = true,
master: String = "local",
additionalConfs: Map[String, String] = Map.empty): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setMaster(master)
.setAppName("test")
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(UI_KILL_ENABLED, killEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
Expand Down Expand Up @@ -725,6 +729,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}

test("Staleness of Spark UI should not last minutes or hours") {
withSpark(newSparkContext(
master = "local[2]",
// Set a small heart beat interval to make the test fast
additionalConfs = Map(
EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms",
LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
// Make the task never finish so there won't be any task start/end events after the first 2
// tasks start.
Thread.sleep(300000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: what about a less than 5 minutes sleep here something comparable with the eventually, like:

        Thread.sleep(20.seconds.toMillis)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I turned on SPARK_JOB_INTERRUPT_ON_CANCEL, so it's not needed to change the sleep time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked and the thread leaks are gone.

}
try {
eventually(timeout(10.seconds)) {
val jobsJson = getJson(sc.ui.get, "jobs")
jobsJson.children.length should be (1)
(jobsJson.children.head \ "numActiveTasks").extract[Int] should be (2)
}
} finally {
f.cancel()
}
}
}

def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,14 @@ Apart from these, the following properties are also available, and may be useful
operations that we can live without when rapidly processing incoming task events.
</td>
</tr>
<tr>
<td><code>spark.ui.liveUpdate.minFlushPeriod</code></td>
<td>1s</td>
<td>
Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming
task events are not fired frequently.
</td>
</tr>
<tr>
<td><code>spark.ui.port</code></td>
<td>4040</td>
Expand Down