@@ -130,48 +130,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
130130 override def onTaskEnd (taskEnd : SparkListenerTaskEnd ) = synchronized {
131131 val info = taskEnd.taskInfo
132132 if (info != null ) {
133- <<<<<<< HEAD
134- val emptyMap = HashMap [Long , AccumulableInfo ]()
135- val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, emptyMap)
136- for (accumulableInfo <- info.accumulables) {
137- accumulables(accumulableInfo.id) = accumulableInfo
138- }
139-
140- // create executor summary map if necessary
141- val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
142- op = new HashMap [String , ExecutorSummary ]())
143- executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary )
144-
145- val executorSummary = executorSummaryMap.get(info.executorId)
146- executorSummary match {
147- case Some (y) => {
148- // first update failed-task, succeed-task
149- taskEnd.reason match {
150- case Success =>
151- y.succeededTasks += 1
152- case _ =>
153- y.failedTasks += 1
154- }
155-
156- // update duration
157- y.taskTime += info.duration
158-
159- val metrics = taskEnd.taskMetrics
160- if (metrics != null ) {
161- metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
162- metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
163- metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
164- y.memoryBytesSpilled += metrics.memoryBytesSpilled
165- y.diskBytesSpilled += metrics.diskBytesSpilled
166- }
167- }
168- case _ => {}
169- =======
170133 val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
171134 logWarning(" Task end for unknown stage " + taskEnd.stageId)
172135 new StageUIData
173136 })
174137
138+ for (accumulableInfo <- info.accumulables) {
139+ stageData.accumulables(accumulableInfo.id) = accumulableInfo
140+ }
141+
175142 val execSummaryMap = stageData.executorSummary
176143 val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary )
177144
@@ -180,7 +147,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
180147 execSummary.succeededTasks += 1
181148 case _ =>
182149 execSummary.failedTasks += 1
183- >>>>>>> apache/ master
184150 }
185151 execSummary.taskTime += info.duration
186152 stageData.numActiveTasks -= 1
0 commit comments