Skip to content

Commit 42904b8

Browse files
daviesandrewor14
authored andcommitted
[SPARK-3465] fix task metrics aggregation in local mode
Before overwrite t.taskMetrics, take a deepcopy of it. Author: Davies Liu <[email protected]> Closes #2338 from davies/fix_metric and squashes the following commits: a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric 7c879e0 [Davies Liu] add more comments 754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true 5ca26dc [Davies Liu] fix task metrics aggregation in local mode
1 parent 33c7a73 commit 42904b8

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,16 @@ private[spark] class Executor(
360360
if (!taskRunner.attemptedTask.isEmpty) {
361361
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
362362
metrics.updateShuffleReadMetrics
363-
tasksMetrics += ((taskRunner.taskId, metrics))
363+
if (isLocal) {
364+
// JobProgressListener will hold an reference of it during
365+
// onExecutorMetricsUpdate(), then JobProgressListener can not see
366+
// the changes of metrics any more, so make a deep copy of it
367+
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
368+
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
369+
} else {
370+
// It will be copied by serialization
371+
tasksMetrics += ((taskRunner.taskId, metrics))
372+
}
364373
}
365374
}
366375
}

0 commit comments

Comments
 (0)