Skip to content

Commit d42c67a

Browse files
JoshRosencloud-fan
authored andcommitted
[SPARK-20776] Fix perf. problems in JobProgressListener caused by TaskMetrics construction
## What changes were proposed in this pull request? In ``` ./bin/spark-shell --master=local[64] ``` I ran ``` sc.parallelize(1 to 100000, 100000).count() ``` and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in `TaskMetrics.empty` calls in `JobProgressListener.onTaskStart`. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code. The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput. **Before:** ![image](https://cloud.githubusercontent.com/assets/50748/26133095/95bcd42a-3a59-11e7-8051-a50550e447b8.png) **After:** ![image](https://cloud.githubusercontent.com/assets/50748/26133070/7935e148-3a59-11e7-8c2d-73d5aa5a2397.png) ## How was this patch tested? Benchmarks described above. Author: Josh Rosen <[email protected]> Closes #18008 from JoshRosen/nametoaccums-improvements. (cherry picked from commit 30e0557) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 7076ab4 commit d42c67a

File tree

3 files changed

+31
-30
lines changed

3 files changed

+31
-30
lines changed

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
329329
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
330330
val taskInfo = taskStart.taskInfo
331331
if (taskInfo != null) {
332-
val metrics = TaskMetrics.empty
333332
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
334333
logWarning("Task start for unknown stage " + taskStart.stageId)
335334
new StageUIData
336335
})
337336
stageData.numActiveTasks += 1
338-
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
337+
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
339338
}
340339
for (
341340
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@@ -405,7 +404,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
405404
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
406405
}
407406

408-
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
407+
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info))
409408
taskData.updateTaskInfo(info)
410409
taskData.updateTaskMetrics(taskMetrics)
411410
taskData.errorMessage = errorMessage

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ private[spark] object UIData {
112112
/**
113113
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
114114
*/
115-
class TaskUIData private(
116-
private var _taskInfo: TaskInfo,
117-
private var _metrics: Option[TaskMetricsUIData]) {
115+
class TaskUIData private(private var _taskInfo: TaskInfo) {
116+
117+
private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY)
118118

119119
var errorMessage: Option[String] = None
120120

@@ -127,7 +127,7 @@ private[spark] object UIData {
127127
}
128128

129129
def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
130-
_metrics = TaskUIData.toTaskMetricsUIData(metrics)
130+
_metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
131131
}
132132

133133
def taskDuration: Option[Long] = {
@@ -140,28 +140,8 @@ private[spark] object UIData {
140140
}
141141

142142
object TaskUIData {
143-
def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
144-
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
145-
}
146-
147-
private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
148-
metrics.map { m =>
149-
TaskMetricsUIData(
150-
executorDeserializeTime = m.executorDeserializeTime,
151-
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
152-
executorRunTime = m.executorRunTime,
153-
executorCpuTime = m.executorCpuTime,
154-
resultSize = m.resultSize,
155-
jvmGCTime = m.jvmGCTime,
156-
resultSerializationTime = m.resultSerializationTime,
157-
memoryBytesSpilled = m.memoryBytesSpilled,
158-
diskBytesSpilled = m.diskBytesSpilled,
159-
peakExecutionMemory = m.peakExecutionMemory,
160-
inputMetrics = InputMetricsUIData(m.inputMetrics),
161-
outputMetrics = OutputMetricsUIData(m.outputMetrics),
162-
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
163-
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
164-
}
143+
def apply(taskInfo: TaskInfo): TaskUIData = {
144+
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
165145
}
166146

167147
/**
@@ -206,6 +186,28 @@ private[spark] object UIData {
206186
shuffleReadMetrics: ShuffleReadMetricsUIData,
207187
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
208188

189+
object TaskMetricsUIData {
190+
def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
191+
TaskMetricsUIData(
192+
executorDeserializeTime = m.executorDeserializeTime,
193+
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
194+
executorRunTime = m.executorRunTime,
195+
executorCpuTime = m.executorCpuTime,
196+
resultSize = m.resultSize,
197+
jvmGCTime = m.jvmGCTime,
198+
resultSerializationTime = m.resultSerializationTime,
199+
memoryBytesSpilled = m.memoryBytesSpilled,
200+
diskBytesSpilled = m.diskBytesSpilled,
201+
peakExecutionMemory = m.peakExecutionMemory,
202+
inputMetrics = InputMetricsUIData(m.inputMetrics),
203+
outputMetrics = OutputMetricsUIData(m.outputMetrics),
204+
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
205+
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
206+
}
207+
208+
val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
209+
}
210+
209211
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
210212
object InputMetricsUIData {
211213
def apply(metrics: InputMetrics): InputMetricsUIData = {

core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class AllStagesResourceSuite extends SparkFunSuite {
3131
val tasks = new LinkedHashMap[Long, TaskUIData]
3232
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
3333
tasks(idx.toLong) = TaskUIData(
34-
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
34+
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false))
3535
}
3636

3737
val stageUiData = new StageUIData()

0 commit comments

Comments
 (0)