Skip to content

Commit 1a31665

Browse files
author
Marcelo Vanzin
committed
Fix a race.
1 parent bb7388b commit 1a31665

File tree

2 files changed

+13
-7
lines changed

2 files changed

+13
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,13 @@ private[sql] class SQLAppStatusListener(
131131
info.successful)
132132
}
133133

134-
def executionMetrics(executionId: Long): Map[Long, String] = {
134+
def liveExecutionMetrics(executionId: Long): Option[Map[Long, String]] = {
135135
Option(liveExecutions.get(executionId)).map { exec =>
136136
if (exec.metricsValues != null) {
137137
exec.metricsValues
138138
} else {
139139
aggregateMetrics(exec)
140140
}
141-
}.getOrElse {
142-
throw new NoSuchElementException(s"execution $executionId not found")
143141
}
144142
}
145143

@@ -207,7 +205,8 @@ private[sql] class SQLAppStatusListener(
207205
}
208206
}
209207

210-
// TODO: storing metrics by task ID can lead to innacurate metrics when speculation is on.
208+
// TODO: storing metrics by task ID can cause metrics for the same task index to be
209+
// counted multiple times, for example due to speculation or re-attempts.
211210
metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
212211
}
213212
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,16 @@ private[sql] class SQLAppStatusStore(
5858
}
5959

6060
def executionMetrics(executionId: Long): Map[Long, String] = {
61-
val exec = store.read(classOf[SQLExecutionUIData], executionId)
62-
Option(exec.metricValues)
63-
.orElse(listener.map(_.executionMetrics(executionId)))
61+
def metricsFromStore(): Option[Map[Long, String]] = {
62+
val exec = store.read(classOf[SQLExecutionUIData], executionId)
63+
Option(exec.metricValues)
64+
}
65+
66+
metricsFromStore()
67+
.orElse(listener.flatMap(_.liveExecutionMetrics(executionId)))
68+
// Try a second time in case the execution finished while this method is trying to
69+
// get the metrics.
70+
.orElse(metricsFromStore())
6471
.getOrElse(Map())
6572
}
6673

0 commit comments

Comments
 (0)