From f24bf52e9712bf7879deef4a4565fcc5d9497237 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Mon, 20 Feb 2017 16:42:23 +0800 Subject: [PATCH 1/2] Filter the driver accumulator updates to prevent NoSuchElementException --- .../org/apache/spark/sql/execution/ui/SQLListener.scala | 7 +++++-- .../apache/spark/sql/execution/ui/SQLListenerSuite.scala | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 5daf21595d8a2..12d3bc9281f35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield { (accumulatorUpdate._1, accumulatorUpdate._2) } - }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } + } val driverUpdates = executionUIData.driverAccumUpdates.toSeq - mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId => + val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter { + case (id, _) => executionUIData.accumulatorMetrics.contains(id) + } + mergeAccumulatorUpdates(totalUpdates, accumulatorId => executionUIData.accumulatorMetrics(accumulatorId).metricType) case None => // This execution has been dropped diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 8aea112897fb3..1b01597c8abbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -147,6 +147,10 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + // Non-existing driver accumulator updates should be filtered and no exception will be thrown. + listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), From 62398ec902d58368d4186ea4f167263bb8df8612 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Thu, 23 Feb 2017 16:02:30 +0800 Subject: [PATCH 2/2] Update the comment --- .../org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 1b01597c8abbd..e41c00ecec271 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -147,7 +147,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - // Non-existing driver accumulator updates should be filtered and no exception will be thrown. + // Driver accumulator updates don't belong to this execution should be filtered and no + // exception will be thrown. listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))