Skip to content

Commit 75bb19a

Browse files
attilapirossquito
authored andcommitted
[SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stag…
…e page ## What changes were proposed in this pull request? Fixing exception got at sorting tasks by Host / Executor ID: ``` java.lang.IllegalArgumentException: Invalid sort column: Host at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017) at org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694) at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61) at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96) at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708) at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293) at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90) at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848) at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584) ``` Moreover some refactoring to avoid similar problems by introducing constants for each header name and reusing them at the identification of the corresponding sorting index. ## How was this patch tested? Manually: ![screen shot 2018-02-13 at 18 57 10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png) (cherry picked from commit 1dc2c1d) Author: “attilapiros” <[email protected]> Closes #20623 from squito/fix_backport.
1 parent 0bd7765 commit 75bb19a

File tree

3 files changed

+139
-47
lines changed

3 files changed

+139
-47
lines changed

core/src/main/scala/org/apache/spark/status/storeTypes.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ private[spark] object TaskIndexNames {
109109
final val DURATION = "dur"
110110
final val ERROR = "err"
111111
final val EXECUTOR = "exe"
112+
final val HOST = "hst"
112113
final val EXEC_CPU_TIME = "ect"
113114
final val EXEC_RUN_TIME = "ert"
114115
final val GC_TIME = "gc"
@@ -165,6 +166,7 @@ private[spark] class TaskDataWrapper(
165166
val duration: Long,
166167
@KVIndexParam(value = TaskIndexNames.EXECUTOR, parent = TaskIndexNames.STAGE)
167168
val executorId: String,
169+
@KVIndexParam(value = TaskIndexNames.HOST, parent = TaskIndexNames.STAGE)
168170
val host: String,
169171
@KVIndexParam(value = TaskIndexNames.STATUS, parent = TaskIndexNames.STAGE)
170172
val status: String,

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 75 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -742,37 +742,39 @@ private[ui] class TaskPagedTable(
742742
}
743743

744744
def headers: Seq[Node] = {
745+
import ApiHelper._
746+
745747
val taskHeadersAndCssClasses: Seq[(String, String)] =
746748
Seq(
747-
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
748-
("Executor ID", ""), ("Host", ""), ("Launch Time", ""), ("Duration", ""),
749-
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
750-
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
751-
("GC Time", ""),
752-
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
753-
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
754-
("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
755-
{if (hasAccumulators(stage)) Seq(("Accumulators", "")) else Nil} ++
756-
{if (hasInput(stage)) Seq(("Input Size / Records", "")) else Nil} ++
757-
{if (hasOutput(stage)) Seq(("Output Size / Records", "")) else Nil} ++
749+
(HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), (HEADER_STATUS, ""),
750+
(HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), (HEADER_LAUNCH_TIME, ""),
751+
(HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, TaskDetailsClassNames.SCHEDULER_DELAY),
752+
(HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
753+
(HEADER_GC_TIME, ""),
754+
(HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
755+
(HEADER_GETTING_RESULT_TIME, TaskDetailsClassNames.GETTING_RESULT_TIME),
756+
(HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
757+
{if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} ++
758+
{if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++
759+
{if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++
758760
{if (hasShuffleRead(stage)) {
759-
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
760-
("Shuffle Read Size / Records", ""),
761-
("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
761+
Seq((HEADER_SHUFFLE_READ_TIME, TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
762+
(HEADER_SHUFFLE_TOTAL_READS, ""),
763+
(HEADER_SHUFFLE_REMOTE_READS, TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
762764
} else {
763765
Nil
764766
}} ++
765767
{if (hasShuffleWrite(stage)) {
766-
Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
768+
Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, ""))
767769
} else {
768770
Nil
769771
}} ++
770772
{if (hasBytesSpilled(stage)) {
771-
Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
773+
Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, ""))
772774
} else {
773775
Nil
774776
}} ++
775-
Seq(("Errors", ""))
777+
Seq((HEADER_ERROR, ""))
776778

777779
if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
778780
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
@@ -953,35 +955,62 @@ private[ui] class TaskPagedTable(
953955
}
954956
}
955957

956-
private object ApiHelper {
957-
958-
959-
private val COLUMN_TO_INDEX = Map(
960-
"ID" -> null.asInstanceOf[String],
961-
"Index" -> TaskIndexNames.TASK_INDEX,
962-
"Attempt" -> TaskIndexNames.ATTEMPT,
963-
"Status" -> TaskIndexNames.STATUS,
964-
"Locality Level" -> TaskIndexNames.LOCALITY,
965-
"Executor ID / Host" -> TaskIndexNames.EXECUTOR,
966-
"Launch Time" -> TaskIndexNames.LAUNCH_TIME,
967-
"Duration" -> TaskIndexNames.DURATION,
968-
"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
969-
"Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
970-
"GC Time" -> TaskIndexNames.GC_TIME,
971-
"Result Serialization Time" -> TaskIndexNames.SER_TIME,
972-
"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
973-
"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
974-
"Accumulators" -> TaskIndexNames.ACCUMULATORS,
975-
"Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
976-
"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
977-
"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
978-
"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
979-
"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
980-
"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
981-
"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
982-
"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
983-
"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
984-
"Errors" -> TaskIndexNames.ERROR)
958+
private[ui] object ApiHelper {
959+
960+
val HEADER_ID = "ID"
961+
val HEADER_TASK_INDEX = "Index"
962+
val HEADER_ATTEMPT = "Attempt"
963+
val HEADER_STATUS = "Status"
964+
val HEADER_LOCALITY = "Locality Level"
965+
val HEADER_EXECUTOR = "Executor ID"
966+
val HEADER_HOST = "Host"
967+
val HEADER_LAUNCH_TIME = "Launch Time"
968+
val HEADER_DURATION = "Duration"
969+
val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
970+
val HEADER_DESER_TIME = "Task Deserialization Time"
971+
val HEADER_GC_TIME = "GC Time"
972+
val HEADER_SER_TIME = "Result Serialization Time"
973+
val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
974+
val HEADER_PEAK_MEM = "Peak Execution Memory"
975+
val HEADER_ACCUMULATORS = "Accumulators"
976+
val HEADER_INPUT_SIZE = "Input Size / Records"
977+
val HEADER_OUTPUT_SIZE = "Output Size / Records"
978+
val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
979+
val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
980+
val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
981+
val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
982+
val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
983+
val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
984+
val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
985+
val HEADER_ERROR = "Errors"
986+
987+
private[ui] val COLUMN_TO_INDEX = Map(
988+
HEADER_ID -> null.asInstanceOf[String],
989+
HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
990+
HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
991+
HEADER_STATUS -> TaskIndexNames.STATUS,
992+
HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
993+
HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
994+
HEADER_HOST -> TaskIndexNames.HOST,
995+
HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME,
996+
HEADER_DURATION -> TaskIndexNames.DURATION,
997+
HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,
998+
HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME,
999+
HEADER_GC_TIME -> TaskIndexNames.GC_TIME,
1000+
HEADER_SER_TIME -> TaskIndexNames.SER_TIME,
1001+
HEADER_GETTING_RESULT_TIME -> TaskIndexNames.GETTING_RESULT_TIME,
1002+
HEADER_PEAK_MEM -> TaskIndexNames.PEAK_MEM,
1003+
HEADER_ACCUMULATORS -> TaskIndexNames.ACCUMULATORS,
1004+
HEADER_INPUT_SIZE -> TaskIndexNames.INPUT_SIZE,
1005+
HEADER_OUTPUT_SIZE -> TaskIndexNames.OUTPUT_SIZE,
1006+
HEADER_SHUFFLE_READ_TIME -> TaskIndexNames.SHUFFLE_READ_TIME,
1007+
HEADER_SHUFFLE_TOTAL_READS -> TaskIndexNames.SHUFFLE_TOTAL_READS,
1008+
HEADER_SHUFFLE_REMOTE_READS -> TaskIndexNames.SHUFFLE_REMOTE_READS,
1009+
HEADER_SHUFFLE_WRITE_TIME -> TaskIndexNames.SHUFFLE_WRITE_TIME,
1010+
HEADER_SHUFFLE_WRITE_SIZE -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
1011+
HEADER_MEM_SPILL -> TaskIndexNames.MEM_SPILL,
1012+
HEADER_DISK_SPILL -> TaskIndexNames.DISK_SPILL,
1013+
HEADER_ERROR -> TaskIndexNames.ERROR)
9851014

9861015
def hasAccumulators(stageData: StageData): Boolean = {
9871016
stageData.accumulatorUpdates.exists { acc => acc.name != null && acc.value != null }

core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,74 @@ import org.apache.spark._
2828
import org.apache.spark.executor.TaskMetrics
2929
import org.apache.spark.scheduler._
3030
import org.apache.spark.status.AppStatusStore
31+
import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
3132
import org.apache.spark.status.config._
32-
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
33+
import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable}
3334

3435
class StagePageSuite extends SparkFunSuite with LocalSparkContext {
3536

3637
private val peakExecutionMemory = 10
3738

39+
test("ApiHelper.COLUMN_TO_INDEX should match headers of the task table") {
40+
val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
41+
val statusStore = AppStatusStore.createLiveStore(conf)
42+
try {
43+
val stageData = new StageData(
44+
status = StageStatus.ACTIVE,
45+
stageId = 1,
46+
attemptId = 1,
47+
numTasks = 1,
48+
numActiveTasks = 1,
49+
numCompleteTasks = 1,
50+
numFailedTasks = 1,
51+
numKilledTasks = 1,
52+
numCompletedIndices = 1,
53+
54+
executorRunTime = 1L,
55+
executorCpuTime = 1L,
56+
submissionTime = None,
57+
firstTaskLaunchedTime = None,
58+
completionTime = None,
59+
failureReason = None,
60+
61+
inputBytes = 1L,
62+
inputRecords = 1L,
63+
outputBytes = 1L,
64+
outputRecords = 1L,
65+
shuffleReadBytes = 1L,
66+
shuffleReadRecords = 1L,
67+
shuffleWriteBytes = 1L,
68+
shuffleWriteRecords = 1L,
69+
memoryBytesSpilled = 1L,
70+
diskBytesSpilled = 1L,
71+
72+
name = "stage1",
73+
description = Some("description"),
74+
details = "detail",
75+
schedulingPool = "pool1",
76+
77+
rddIds = Seq(1),
78+
accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
79+
tasks = None,
80+
executorSummary = None,
81+
killedTasksSummary = Map.empty
82+
)
83+
val taskTable = new TaskPagedTable(
84+
stageData,
85+
basePath = "/a/b/c",
86+
currentTime = 0,
87+
pageSize = 10,
88+
sortColumn = "Index",
89+
desc = false,
90+
store = statusStore
91+
)
92+
val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet
93+
assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
94+
} finally {
95+
statusStore.close()
96+
}
97+
}
98+
3899
test("peak execution memory should displayed") {
39100
val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
40101
val targetString = "peak execution memory"

0 commit comments

Comments
 (0)