Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 15 additions & 34 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// if we find that it's okay.
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)

private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)

private def getLocalitySummaryString(stageData: StageUIData): String = {
val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
val localityCounts = localities.groupBy(identity).mapValues(_.size)
Expand Down Expand Up @@ -252,15 +250,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Getting Result Time</span>
</span>
</li>
{if (displayPeakExecutionMemory) {
<li>
<span data-toggle="tooltip"
title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
<span class="additional-metric-title">Peak Execution Memory</span>
</span>
</li>
}}
<li>
<span data-toggle="tooltip"
title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
<span class="additional-metric-title">Peak Execution Memory</span>
</span>
</li>
</ul>
</div>
</div>
Expand Down Expand Up @@ -532,13 +528,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{serializationQuantiles}
</tr>,
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
if (displayPeakExecutionMemory) {
<tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{peakExecutionMemoryQuantiles}
</tr>
} else {
Nil
},
<tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{peakExecutionMemoryQuantiles}
</tr>,
if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
if (stageData.hasShuffleRead) {
Expand Down Expand Up @@ -1166,9 +1158,6 @@ private[ui] class TaskPagedTable(
desc: Boolean,
executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {

// We only track peak memory used for unsafe operators
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)

override def tableId: String = "task-table"

override def tableCssClass: String =
Expand Down Expand Up @@ -1217,14 +1206,8 @@ private[ui] class TaskPagedTable(
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", ""),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
{
if (displayPeakExecutionMemory) {
Seq(("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY))
} else {
Nil
}
} ++
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
{if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
{if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
Expand Down Expand Up @@ -1316,11 +1299,9 @@ private[ui] class TaskPagedTable(
<td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
{UIUtils.formatDuration(task.gettingResultTime)}
</td>
{if (displayPeakExecutionMemory) {
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{Utils.bytesToString(task.peakExecutionMemoryUsed)}
</td>
}}
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{Utils.bytesToString(task.peakExecutionMemoryUsed)}
</td>
{if (task.accumulators.nonEmpty) {
<td>{Unparsed(task.accumulators.get)}</td>
}}
Expand Down
16 changes: 3 additions & 13 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,15 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {

private val peakExecutionMemory = 10

test("peak execution memory only displayed if unsafe is enabled") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
test("peak execution memory should displayed") {
val conf = new SparkConf(false)
val html = renderStagePage(conf).toString().toLowerCase
val targetString = "peak execution memory"
assert(html.contains(targetString))
// Disable unsafe and make sure it's not there
val conf2 = new SparkConf(false).set(unsafeConf, "false")
val html2 = renderStagePage(conf2).toString().toLowerCase
assert(!html2.contains(targetString))
// Avoid setting anything; it should be displayed by default
val conf3 = new SparkConf(false)
val html3 = renderStagePage(conf3).toString().toLowerCase
assert(html3.contains(targetString))
}

test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
val conf = new SparkConf(false)
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
Expand Down