diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 312bcccb1cca..0487f2f07c09 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -20,6 +20,7 @@ package org.apache.spark.status import java.util.{List => JList} import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.status.api.v1 @@ -386,10 +387,9 @@ private[spark] class AppStatusStore( def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = { val stageKey = Array(stageId, stageAttemptId) - store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse() - .max(maxTasks).asScala.map { taskDataWrapper => - constructTaskData(taskDataWrapper) - }.toSeq.reverse + val taskDataWrapperIter = store.view(classOf[TaskDataWrapper]).index("stage") + .first(stageKey).last(stageKey).reverse().max(maxTasks).asScala + constructTaskDataList(taskDataWrapperIter).reverse } def taskList( @@ -428,9 +428,8 @@ private[spark] class AppStatusStore( } val ordered = if (ascending) indexed else indexed.reverse() - ordered.skip(offset).max(length).asScala.map { taskDataWrapper => - constructTaskData(taskDataWrapper) - }.toSeq + val taskDataWrapperIter = ordered.skip(offset).max(length).asScala + constructTaskDataList(taskDataWrapperIter) } def executorSummary(stageId: Int, attemptId: Int): Map[String, v1.ExecutorStageSummary] = { @@ -536,24 +535,29 @@ private[spark] class AppStatusStore( store.close() } - def constructTaskData(taskDataWrapper: TaskDataWrapper) : v1.TaskData = { - val taskDataOld: v1.TaskData = taskDataWrapper.toApi - val executorLogs: Option[Map[String, String]] = try { - Some(executorSummary(taskDataOld.executorId).executorLogs) - } catch { - case e: NoSuchElementException => e.getMessage - None - } - new v1.TaskData(taskDataOld.taskId, taskDataOld.index, - taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart, - taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status, - taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates, - taskDataOld.errorMessage, taskDataOld.taskMetrics, - executorLogs.getOrElse(Map[String, String]()), - AppStatusUtils.schedulerDelay(taskDataOld), - AppStatusUtils.gettingResultTime(taskDataOld)) + def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): Seq[v1.TaskData] = { + val executorIdToLogs = new HashMap[String, Map[String, String]]() + taskDataWrapperIter.map { taskDataWrapper => + val taskDataOld: v1.TaskData = taskDataWrapper.toApi + val executorLogs = executorIdToLogs.getOrElseUpdate(taskDataOld.executorId, { + try { + executorSummary(taskDataOld.executorId).executorLogs + } catch { + case e: NoSuchElementException => + Map.empty + } + }) + + new v1.TaskData(taskDataOld.taskId, taskDataOld.index, + taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart, + taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status, + taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates, + taskDataOld.errorMessage, taskDataOld.taskMetrics, + executorLogs, + AppStatusUtils.schedulerDelay(taskDataOld), + AppStatusUtils.gettingResultTime(taskDataOld)) + }.toSeq } - } private[spark] object AppStatusStore {