Skip to content

Commit d989434

Browse files
carsonwangcloud-fan
authored andcommitted
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to …
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates N.B. This is a backport to branch-2.1 of #17009. ## What changes were proposed in this pull request? In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException. ## How was this patch tested? Updated unit test. Author: Carson Wang <carson.wangintel.com> Author: Carson Wang <[email protected]> Closes #17418 from mallman/spark-19674-backport_2.1.
1 parent 92f0b01 commit d989434

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
343343
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
344344
(accumulatorUpdate._1, accumulatorUpdate._2)
345345
}
346-
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
346+
}
347347

348348
val driverUpdates = executionUIData.driverAccumUpdates.toSeq
349-
mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
349+
val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
350+
case (id, _) => executionUIData.accumulatorMetrics.contains(id)
351+
}
352+
mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
350353
executionUIData.accumulatorMetrics(accumulatorId).metricType)
351354
case None =>
352355
// This execution has been dropped

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
147147

148148
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
149149

150+
// Driver accumulator updates don't belong to this execution should be filtered and no
151+
// exception will be thrown.
152+
listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
153+
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
154+
150155
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
151156
// (task id, stage id, stage attempt, accum updates)
152157
(0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),

0 commit comments

Comments
 (0)