diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8c7cefe200739..412ddfa9fad35 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -70,8 +70,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // if we find that it's okay. private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) - private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true) - private def getLocalitySummaryString(stageData: StageUIData): String = { val localities = stageData.taskData.values.map(_.taskInfo.taskLocality) val localityCounts = localities.groupBy(identity).mapValues(_.size) @@ -252,15 +250,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { Getting Result Time - {if (displayPeakExecutionMemory) { -
  • - - - Peak Execution Memory - -
  • - }} +
  • + + + Peak Execution Memory + +
  • @@ -532,13 +528,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {serializationQuantiles} , {gettingResultQuantiles}, - if (displayPeakExecutionMemory) { - - {peakExecutionMemoryQuantiles} - - } else { - Nil - }, + + {peakExecutionMemoryQuantiles} + , if (stageData.hasInput) {inputQuantiles} else Nil, if (stageData.hasOutput) {outputQuantiles} else Nil, if (stageData.hasShuffleRead) { @@ -1166,9 +1158,6 @@ private[ui] class TaskPagedTable( desc: Boolean, executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] { - // We only track peak memory used for unsafe operators - private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true) - override def tableId: String = "task-table" override def tableCssClass: String = @@ -1217,14 +1206,8 @@ private[ui] class TaskPagedTable( ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), ("GC Time", ""), ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), - ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ - { - if (displayPeakExecutionMemory) { - Seq(("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) - } else { - Nil - } - } ++ + ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME), + ("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++ {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ @@ -1316,11 +1299,9 @@ private[ui] class TaskPagedTable( {UIUtils.formatDuration(task.gettingResultTime)} - {if (displayPeakExecutionMemory) { - - {Utils.bytesToString(task.peakExecutionMemoryUsed)} - - }} + + {Utils.bytesToString(task.peakExecutionMemoryUsed)} + {if (task.accumulators.nonEmpty) { {Unparsed(task.accumulators.get)} }} diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index d30b987d6ca31..11482d187aeca 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -35,25 +35,15 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { private val peakExecutionMemory = 10 - test("peak execution memory only displayed if unsafe is enabled") { - val unsafeConf = "spark.sql.unsafe.enabled" - val conf = new SparkConf(false).set(unsafeConf, "true") + test("peak execution memory should displayed") { + val conf = new SparkConf(false) val html = renderStagePage(conf).toString().toLowerCase val targetString = "peak execution memory" assert(html.contains(targetString)) - // Disable unsafe and make sure it's not there - val conf2 = new SparkConf(false).set(unsafeConf, "false") - val html2 = renderStagePage(conf2).toString().toLowerCase - assert(!html2.contains(targetString)) - // Avoid setting anything; it should be displayed by default - val conf3 = new SparkConf(false) - val html3 = renderStagePage(conf3).toString().toLowerCase - assert(html3.contains(targetString)) } test("SPARK-10543: peak execution memory should be per-task rather than cumulative") { - val unsafeConf = "spark.sql.unsafe.enabled" - val conf = new SparkConf(false).set(unsafeConf, "true") + val conf = new SparkConf(false) val html = renderStagePage(conf).toString().toLowerCase // verify min/25/50/75/max show task value not cumulative values assert(html.contains(s"$peakExecutionMemory.0 b" * 5))