Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private[spark] object TaskIndexNames {
final val DURATION = "dur"
final val ERROR = "err"
final val EXECUTOR = "exe"
final val HOST = "hst"
final val EXEC_CPU_TIME = "ect"
final val EXEC_RUN_TIME = "ert"
final val GC_TIME = "gc"
Expand Down Expand Up @@ -165,6 +166,7 @@ private[spark] class TaskDataWrapper(
val duration: Long,
@KVIndexParam(value = TaskIndexNames.EXECUTOR, parent = TaskIndexNames.STAGE)
val executorId: String,
@KVIndexParam(value = TaskIndexNames.HOST, parent = TaskIndexNames.STAGE)
val host: String,
@KVIndexParam(value = TaskIndexNames.STATUS, parent = TaskIndexNames.STAGE)
val status: String,
Expand Down
121 changes: 75 additions & 46 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -742,37 +742,39 @@ private[ui] class TaskPagedTable(
}

def headers: Seq[Node] = {
import ApiHelper._

val taskHeadersAndCssClasses: Seq[(String, String)] =
Seq(
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
("Executor ID", ""), ("Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", ""),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
{if (hasAccumulators(stage)) Seq(("Accumulators", "")) else Nil} ++
{if (hasInput(stage)) Seq(("Input Size / Records", "")) else Nil} ++
{if (hasOutput(stage)) Seq(("Output Size / Records", "")) else Nil} ++
(HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), (HEADER_STATUS, ""),
(HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), (HEADER_LAUNCH_TIME, ""),
(HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, TaskDetailsClassNames.SCHEDULER_DELAY),
(HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
(HEADER_GC_TIME, ""),
(HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
(HEADER_GETTING_RESULT_TIME, TaskDetailsClassNames.GETTING_RESULT_TIME),
(HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
{if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} ++
{if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++
{if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++
{if (hasShuffleRead(stage)) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
("Shuffle Read Size / Records", ""),
("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
Seq((HEADER_SHUFFLE_READ_TIME, TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
(HEADER_SHUFFLE_TOTAL_READS, ""),
(HEADER_SHUFFLE_REMOTE_READS, TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
{if (hasShuffleWrite(stage)) {
Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, ""))
} else {
Nil
}} ++
{if (hasBytesSpilled(stage)) {
Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, ""))
} else {
Nil
}} ++
Seq(("Errors", ""))
Seq((HEADER_ERROR, ""))

if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
Expand Down Expand Up @@ -953,35 +955,62 @@ private[ui] class TaskPagedTable(
}
}

private object ApiHelper {


private val COLUMN_TO_INDEX = Map(
"ID" -> null.asInstanceOf[String],
"Index" -> TaskIndexNames.TASK_INDEX,
"Attempt" -> TaskIndexNames.ATTEMPT,
"Status" -> TaskIndexNames.STATUS,
"Locality Level" -> TaskIndexNames.LOCALITY,
"Executor ID / Host" -> TaskIndexNames.EXECUTOR,
"Launch Time" -> TaskIndexNames.LAUNCH_TIME,
"Duration" -> TaskIndexNames.DURATION,
"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
"Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
"GC Time" -> TaskIndexNames.GC_TIME,
"Result Serialization Time" -> TaskIndexNames.SER_TIME,
"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
"Accumulators" -> TaskIndexNames.ACCUMULATORS,
"Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
"Errors" -> TaskIndexNames.ERROR)
private[ui] object ApiHelper {

val HEADER_ID = "ID"
val HEADER_TASK_INDEX = "Index"
val HEADER_ATTEMPT = "Attempt"
val HEADER_STATUS = "Status"
val HEADER_LOCALITY = "Locality Level"
val HEADER_EXECUTOR = "Executor ID"
val HEADER_HOST = "Host"
val HEADER_LAUNCH_TIME = "Launch Time"
val HEADER_DURATION = "Duration"
val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
val HEADER_DESER_TIME = "Task Deserialization Time"
val HEADER_GC_TIME = "GC Time"
val HEADER_SER_TIME = "Result Serialization Time"
val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
val HEADER_PEAK_MEM = "Peak Execution Memory"
val HEADER_ACCUMULATORS = "Accumulators"
val HEADER_INPUT_SIZE = "Input Size / Records"
val HEADER_OUTPUT_SIZE = "Output Size / Records"
val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
val HEADER_ERROR = "Errors"

private[ui] val COLUMN_TO_INDEX = Map(
HEADER_ID -> null.asInstanceOf[String],
HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
HEADER_STATUS -> TaskIndexNames.STATUS,
HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
HEADER_HOST -> TaskIndexNames.HOST,
HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME,
HEADER_DURATION -> TaskIndexNames.DURATION,
HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,
HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME,
HEADER_GC_TIME -> TaskIndexNames.GC_TIME,
HEADER_SER_TIME -> TaskIndexNames.SER_TIME,
HEADER_GETTING_RESULT_TIME -> TaskIndexNames.GETTING_RESULT_TIME,
HEADER_PEAK_MEM -> TaskIndexNames.PEAK_MEM,
HEADER_ACCUMULATORS -> TaskIndexNames.ACCUMULATORS,
HEADER_INPUT_SIZE -> TaskIndexNames.INPUT_SIZE,
HEADER_OUTPUT_SIZE -> TaskIndexNames.OUTPUT_SIZE,
HEADER_SHUFFLE_READ_TIME -> TaskIndexNames.SHUFFLE_READ_TIME,
HEADER_SHUFFLE_TOTAL_READS -> TaskIndexNames.SHUFFLE_TOTAL_READS,
HEADER_SHUFFLE_REMOTE_READS -> TaskIndexNames.SHUFFLE_REMOTE_READS,
HEADER_SHUFFLE_WRITE_TIME -> TaskIndexNames.SHUFFLE_WRITE_TIME,
HEADER_SHUFFLE_WRITE_SIZE -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
HEADER_MEM_SPILL -> TaskIndexNames.MEM_SPILL,
HEADER_DISK_SPILL -> TaskIndexNames.DISK_SPILL,
HEADER_ERROR -> TaskIndexNames.ERROR)

def hasAccumulators(stageData: StageData): Boolean = {
stageData.accumulatorUpdates.exists { acc => acc.name != null && acc.value != null }
Expand Down
63 changes: 62 additions & 1 deletion core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,74 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
import org.apache.spark.status.config._
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable}

class StagePageSuite extends SparkFunSuite with LocalSparkContext {

private val peakExecutionMemory = 10

test("ApiHelper.COLUMN_TO_INDEX should match headers of the task table") {
val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
val statusStore = AppStatusStore.createLiveStore(conf)
try {
val stageData = new StageData(
status = StageStatus.ACTIVE,
stageId = 1,
attemptId = 1,
numTasks = 1,
numActiveTasks = 1,
numCompleteTasks = 1,
numFailedTasks = 1,
numKilledTasks = 1,
numCompletedIndices = 1,

executorRunTime = 1L,
executorCpuTime = 1L,
submissionTime = None,
firstTaskLaunchedTime = None,
completionTime = None,
failureReason = None,

inputBytes = 1L,
inputRecords = 1L,
outputBytes = 1L,
outputRecords = 1L,
shuffleReadBytes = 1L,
shuffleReadRecords = 1L,
shuffleWriteBytes = 1L,
shuffleWriteRecords = 1L,
memoryBytesSpilled = 1L,
diskBytesSpilled = 1L,

name = "stage1",
description = Some("description"),
details = "detail",
schedulingPool = "pool1",

rddIds = Seq(1),
accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
tasks = None,
executorSummary = None,
killedTasksSummary = Map.empty
)
val taskTable = new TaskPagedTable(
stageData,
basePath = "/a/b/c",
currentTime = 0,
pageSize = 10,
sortColumn = "Index",
desc = false,
store = statusStore
)
val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet
assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
} finally {
statusStore.close()
}
}

test("peak execution memory should displayed") {
val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
val targetString = "peak execution memory"
Expand Down