From ee53708cc9c19fe3c6026c840604043298eccdb8 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 4 Apr 2019 14:28:38 -0700 Subject: [PATCH 1/7] fix --- .../spark/status/AppStatusListener.scala | 24 ++++++++------- .../org/apache/spark/ui/UISeleniumSuite.scala | 30 +++++++++++++++++-- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index a3e82424be6e6..a88ed65d0309b 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -89,7 +89,7 @@ private[spark] class AppStatusListener( kvstore.onFlush { if (!live) { - flush() + flush(update(_, _)) } } @@ -831,6 +831,10 @@ private[spark] class AppStatusListener( } } } + // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush + // here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat + // interval. + flush(maybeUpdate(_, _)) } override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { @@ -856,18 +860,18 @@ private[spark] class AppStatusListener( } } - /** Flush all live entities' data to the underlying store. */ - private def flush(): Unit = { + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity, now)` to flush them. */ + private def flush(entityFlushFunc: (LiveEntity, Long) => Unit): Unit = { val now = System.nanoTime() liveStages.values.asScala.foreach { stage => - update(stage, now) - stage.executorSummaries.values.foreach(update(_, now)) + entityFlushFunc(stage, now) + stage.executorSummaries.values.foreach(entityFlushFunc(_, now)) } - liveJobs.values.foreach(update(_, now)) - liveExecutors.values.foreach(update(_, now)) - liveTasks.values.foreach(update(_, now)) - liveRDDs.values.foreach(update(_, now)) - pools.values.foreach(update(_, now)) + liveJobs.values.foreach(entityFlushFunc(_, now)) + liveExecutors.values.foreach(entityFlushFunc(_, now)) + liveTasks.values.foreach(entityFlushFunc(_, now)) + liveRDDs.values.foreach(entityFlushFunc(_, now)) + pools.values.foreach(entityFlushFunc(_, now)) } /** diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index cdc7185a35512..7da47faa0938e 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -100,14 +100,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B * Create a test SparkContext with the SparkUI enabled. * It is safe to `get` the SparkUI directly from the SparkContext returned here. */ - private def newSparkContext(killEnabled: Boolean = true): SparkContext = { + private def newSparkContext( + killEnabled: Boolean = true, + master: String = "local", + additionalConfs: Map[String, String] = Map.empty): SparkContext = { val conf = new SparkConf() - .setMaster("local") + .setMaster(master) .setAppName("test") .set(UI_ENABLED, true) .set(UI_PORT, 0) .set(UI_KILL_ENABLED, killEnabled) .set(MEMORY_OFFHEAP_SIZE.key, "64m") + additionalConfs.foreach { case (k, v) => conf.set(k, v) } val sc = new SparkContext(conf) assert(sc.ui.isDefined) sc @@ -725,6 +729,28 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + test("Staleness of Spark UI should not last minutes or hours") { + withSpark(newSparkContext( + master = "local[2]", + // Set a small heart beat interval to make the test fast + additionalConfs = Map(EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms"))) { sc => + val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => + // Make the task never finish so there won't be any task start/end events after the first 2 + // tasks start. + Thread.sleep(300000) + } + try { + eventually(timeout(10.seconds)) { + val jobsJson = getJson(sc.ui.get, "jobs") + jobsJson.children.length should be (1) + (jobsJson.children.head \ "numActiveTasks").extract[Int] should be (2) + } + } finally { + f.cancel() + } + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } From 1f927a25a5cc7dc87e8133a2c9cab41b3e109c3b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 4 Apr 2019 14:56:41 -0700 Subject: [PATCH 2/7] avoid to traverse all entities too frequently --- .../spark/status/AppStatusListener.scala | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index a88ed65d0309b..23c90f0483146 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -76,6 +76,9 @@ private[spark] class AppStatusListener( // around liveExecutors. @volatile private var activeExecutorCount = 0 + /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ + private var lastFlushTimeNs = System.nanoTime() + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) { count => cleanupExecutors(count) } @@ -89,7 +92,8 @@ private[spark] class AppStatusListener( kvstore.onFlush { if (!live) { - flush(update(_, _)) + val now = System.nanoTime() + flush(update(_, now)) } } @@ -834,7 +838,11 @@ private[spark] class AppStatusListener( // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush // here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat // interval. - flush(maybeUpdate(_, _)) + if (now - lastFlushTimeNs > liveUpdatePeriodNs) { + flush(maybeUpdate(_, now)) + // Re-get the current system time because `flush` may be slow and `now` is stale. + lastFlushTimeNs = System.nanoTime() + } } override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { @@ -860,18 +868,17 @@ private[spark] class AppStatusListener( } } - /** Go through all `LiveEntity`s and use `entityFlushFunc(entity, now)` to flush them. */ - private def flush(entityFlushFunc: (LiveEntity, Long) => Unit): Unit = { - val now = System.nanoTime() + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ + private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => - entityFlushFunc(stage, now) - stage.executorSummaries.values.foreach(entityFlushFunc(_, now)) + entityFlushFunc(stage) + stage.executorSummaries.values.foreach(entityFlushFunc) } - liveJobs.values.foreach(entityFlushFunc(_, now)) - liveExecutors.values.foreach(entityFlushFunc(_, now)) - liveTasks.values.foreach(entityFlushFunc(_, now)) - liveRDDs.values.foreach(entityFlushFunc(_, now)) - pools.values.foreach(entityFlushFunc(_, now)) + liveJobs.values.foreach(entityFlushFunc) + liveExecutors.values.foreach(entityFlushFunc) + liveTasks.values.foreach(entityFlushFunc) + liveRDDs.values.foreach(entityFlushFunc) + pools.values.foreach(entityFlushFunc) } /** From 5a04be90adec663c91c88dbac984690e46caee32 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 5 Apr 2019 10:14:08 -0700 Subject: [PATCH 3/7] add a config for flushing all entities --- .../org/apache/spark/internal/config/Status.scala | 10 ++++++++++ .../apache/spark/status/AppStatusListener.scala | 14 +++++++++++--- .../org/apache/spark/ui/UISeleniumSuite.scala | 4 +++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index c56157227f8fc..22c4ddff4a9cd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -29,6 +29,16 @@ private[spark] object Status { .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("100ms") + val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit") + .internal() + .doc( + """A time limit before we force to flush all live entities. When the last flush does't past + |this limit, UI will not trigger a heavy flush to sync the states since it may slow down + |Spark events processing significantly. Otherwise, UI will try to flush as soon as possible. + """.stripMargin) + .timeConf(TimeUnit.NANOSECONDS) + .createWithDefaultString("1s") + val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs") .intConf .createWithDefault(1000) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 23c90f0483146..2fe171148c6b7 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -58,6 +58,14 @@ private[spark] class AppStatusListener( // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + /** + * A time limit before we force to flush all live entities. When the last flush does't past + * this limit, UI will not trigger a heavy flush to sync the states since it may slow down Spark + * events processing significantly. Otherwise, UI will try to flush when receiving the next + * executor heartbeat. + */ + private val liveUpdateStalenessLimit = conf.get(LIVE_ENTITY_UPDATE_STALENESS_LIMIT) + private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) @@ -836,9 +844,9 @@ private[spark] class AppStatusListener( } } // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush - // here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat - // interval. - if (now - lastFlushTimeNs > liveUpdatePeriodNs) { + // here to ensure the staleness of Spark UI doesn't last more than + // `max(heartbeat interval, liveUpdateStalenessLimit)`. + if (now - lastFlushTimeNs > liveUpdateStalenessLimit) { flush(maybeUpdate(_, now)) // Re-get the current system time because `flush` may be slow and `now` is stale. lastFlushTimeNs = System.nanoTime() diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 7da47faa0938e..144dd260809e5 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -733,7 +733,9 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B withSpark(newSparkContext( master = "local[2]", // Set a small heart beat interval to make the test fast - additionalConfs = Map(EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms"))) { sc => + additionalConfs = Map( + EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms", + LIVE_ENTITY_UPDATE_STALENESS_LIMIT.key -> "10ms"))) { sc => val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => // Make the task never finish so there won't be any task start/end events after the first 2 // tasks start. From 289e9968bbeef46d65693c95e65e38af5089cf97 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 5 Apr 2019 12:57:49 -0700 Subject: [PATCH 4/7] nit --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 2fe171148c6b7..99fa8a98facd9 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -59,7 +59,7 @@ private[spark] class AppStatusListener( private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L /** - * A time limit before we force to flush all live entities. When the last flush does't past + * A time limit before we force to flush all live entities. When the last flush doesn't past * this limit, UI will not trigger a heavy flush to sync the states since it may slow down Spark * events processing significantly. Otherwise, UI will try to flush when receiving the next * executor heartbeat. From 2645f35651cf9ac68e1e4b7d1291469f4a670587 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 5 Apr 2019 13:04:28 -0700 Subject: [PATCH 5/7] nit 2 --- .../main/scala/org/apache/spark/internal/config/Status.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 22c4ddff4a9cd..b0b78b1d8302e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -32,7 +32,7 @@ private[spark] object Status { val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit") .internal() .doc( - """A time limit before we force to flush all live entities. When the last flush does't past + """A time limit before we force to flush all live entities. When the last flush doesn't past |this limit, UI will not trigger a heavy flush to sync the states since it may slow down |Spark events processing significantly. Otherwise, UI will try to flush as soon as possible. """.stripMargin) From 39ea3571c289ef51429cfa83f00758ef6806a611 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 5 Apr 2019 15:26:02 -0700 Subject: [PATCH 6/7] interrupt task --- core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 144dd260809e5..9b1d1d3ecf97b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -736,6 +736,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B additionalConfs = Map( EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms", LIVE_ENTITY_UPDATE_STALENESS_LIMIT.key -> "10ms"))) { sc => + sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => // Make the task never finish so there won't be any task start/end events after the first 2 // tasks start. From 1c5307161c1731112b8d7b096778d0f56101d04b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 8 Apr 2019 15:21:48 -0700 Subject: [PATCH 7/7] Address --- .../org/apache/spark/internal/config/Status.scala | 10 +++------- .../org/apache/spark/status/AppStatusListener.scala | 12 +++++------- .../scala/org/apache/spark/ui/UISeleniumSuite.scala | 2 +- docs/configuration.md | 8 ++++++++ 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index b0b78b1d8302e..3e6a4e9810664 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -29,13 +29,9 @@ private[spark] object Status { .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("100ms") - val LIVE_ENTITY_UPDATE_STALENESS_LIMIT = ConfigBuilder("spark.ui.liveUpdate.stalenessLimit") - .internal() - .doc( - """A time limit before we force to flush all live entities. When the last flush doesn't past - |this limit, UI will not trigger a heavy flush to sync the states since it may slow down - |Spark events processing significantly. Otherwise, UI will try to flush as soon as possible. - """.stripMargin) + val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD = ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod") + .doc("Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when " + + "incoming task events are not fired frequently.") .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("1s") diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 99fa8a98facd9..b085f21f2d5cc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -59,12 +59,10 @@ private[spark] class AppStatusListener( private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L /** - * A time limit before we force to flush all live entities. When the last flush doesn't past - * this limit, UI will not trigger a heavy flush to sync the states since it may slow down Spark - * events processing significantly. Otherwise, UI will try to flush when receiving the next - * executor heartbeat. + * Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming + * task events are not fired frequently. */ - private val liveUpdateStalenessLimit = conf.get(LIVE_ENTITY_UPDATE_STALENESS_LIMIT) + private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD) private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) @@ -845,8 +843,8 @@ private[spark] class AppStatusListener( } // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush // here to ensure the staleness of Spark UI doesn't last more than - // `max(heartbeat interval, liveUpdateStalenessLimit)`. - if (now - lastFlushTimeNs > liveUpdateStalenessLimit) { + // `max(heartbeat interval, liveUpdateMinFlushPeriod)`. + if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) { flush(maybeUpdate(_, now)) // Re-get the current system time because `flush` may be slow and `now` is stale. lastFlushTimeNs = System.nanoTime() diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 9b1d1d3ecf97b..a9f03ebb72b2b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -735,7 +735,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B // Set a small heart beat interval to make the test fast additionalConfs = Map( EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms", - LIVE_ENTITY_UPDATE_STALENESS_LIMIT.key -> "10ms"))) { sc => + LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc => sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => // Make the task never finish so there won't be any task start/end events after the first 2 diff --git a/docs/configuration.md b/docs/configuration.md index 3289954865953..1c7c9cf444760 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -874,6 +874,14 @@ Apart from these, the following properties are also available, and may be useful operations that we can live without when rapidly processing incoming task events. + + spark.ui.liveUpdate.minFlushPeriod + 1s + + Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming + task events are not fired frequently. + + spark.ui.port 4040