From bd627b8f10924e809c57c71ca7839c4593144dea Mon Sep 17 00:00:00 2001 From: Yewei Zhang Date: Wed, 18 Jul 2018 13:13:06 -0700 Subject: [PATCH] Add job group support to spark 2.3 --- .../apache/spark/status/AppStatusStore.scala | 12 + .../org/apache/spark/status/storeTypes.scala | 3 + .../apache/spark/ui/jobs/AllJobsPage.scala | 495 +----------------- .../apache/spark/ui/jobs/JobGroupPage.scala | 148 ++++++ .../apache/spark/ui/jobs/JobPagedTable.scala | 276 ++++++++++ .../org/apache/spark/ui/jobs/JobsTab.scala | 1 + .../org/apache/spark/ui/jobs/JobsUtils.scala | 272 ++++++++++ .../org/apache/spark/ui/UISeleniumSuite.scala | 42 ++ 8 files changed, 761 insertions(+), 488 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobPagedTable.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 688f25a9fdea1..f2fb91fc3321c 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -52,6 +52,18 @@ private[spark] class AppStatusStore( } } + def jobsInJobGroupList( + jobGroupId: String, + statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { + val it = store.view(classOf[JobDataWrapper]).index("jobGroupId") + .first(jobGroupId).last(jobGroupId).asScala.map(_.info) + if (statuses != null && !statuses.isEmpty()) { + it.filter { job => statuses.contains(job.status) }.toSeq + } else { + it.toSeq + } + } + def job(jobId: Int): v1.JobData = { store.read(classOf[JobDataWrapper], jobId).info } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 646cf25880e37..8e844a5cfc885 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -73,6 +73,9 @@ private[spark] class JobDataWrapper( @JsonIgnore @KVIndex private def id: Int = info.jobId + @JsonIgnore @KVIndex("jobGroupId") + private def jobGroupId: String = info.jobGroup.getOrElse("") + @JsonIgnore @KVIndex("completionTime") private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index a1bc93e8f6781..8e2e42c2bc62e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -17,260 +17,20 @@ package org.apache.spark.ui.jobs -import java.net.URLEncoder -import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.xml._ -import org.apache.commons.lang3.StringEscapeUtils - import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler._ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1 import org.apache.spark.ui._ -import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished jobs */ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") { - import ApiHelper._ - - private val JOBS_LEGEND = -
- - Succeeded - - Failed - - Running -
.toString.filter(_ != '\n') - - private val EXECUTORS_LEGEND = -
- - Added - - Removed -
.toString.filter(_ != '\n') - - private def makeJobEvent(jobs: Seq[v1.JobData]): Seq[String] = { - jobs.filter { job => - job.status != JobExecutionStatus.UNKNOWN && job.submissionTime.isDefined - }.map { job => - val jobId = job.jobId - val status = job.status - val (_, lastStageDescription) = lastStageNameAndDescription(store, job) - val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text - - val submissionTime = job.submissionTime.get.getTime() - val completionTime = job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis()) - val classNameByStatus = status match { - case JobExecutionStatus.SUCCEEDED => "succeeded" - case JobExecutionStatus.FAILED => "failed" - case JobExecutionStatus.RUNNING => "running" - case JobExecutionStatus.UNKNOWN => "unknown" - } - - // The timeline library treats contents as HTML, so we have to escape them. We need to add - // extra layers of escaping in order to embed this in a Javascript string literal. - val escapedDesc = Utility.escape(jobDescription) - val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc) - val jobEventJsonAsStr = - s""" - |{ - | 'className': 'job application-timeline-object ${classNameByStatus}', - | 'group': 'jobs', - | 'start': new Date(${submissionTime}), - | 'end': new Date(${completionTime}), - | 'content': '
Completed: ${UIUtils.formatDate(new Date(completionTime))}""" - } else { - "" - } - }">' + - | '${jsEscapedDesc} (Job ${jobId})
' - |} - """.stripMargin - jobEventJsonAsStr - } - } - - private def makeExecutorEvent(executors: Seq[v1.ExecutorSummary]): - Seq[String] = { - val events = ListBuffer[String]() - executors.foreach { e => - val addedEvent = - s""" - |{ - | 'className': 'executor added', - | 'group': 'executors', - | 'start': new Date(${e.addTime.getTime()}), - | 'content': '
Executor ${e.id} added
' - |} - """.stripMargin - events += addedEvent - - e.removeTime.foreach { removeTime => - val removedEvent = - s""" - |{ - | 'className': 'executor removed', - | 'group': 'executors', - | 'start': new Date(${removeTime.getTime()}), - | 'content': '
Reason: ${reason.replace("\n", " ")}""" - }.getOrElse("") - }"' + - | 'data-html="true">Executor ${e.id} removed
' - |} - """.stripMargin - events += removedEvent - } - } - events.toSeq - } - - private def makeTimeline( - jobs: Seq[v1.JobData], - executors: Seq[v1.ExecutorSummary], - startTime: Long): Seq[Node] = { - - val jobEventJsonAsStrSeq = makeJobEvent(jobs) - val executorEventJsonAsStrSeq = makeExecutorEvent(executors) - - val groupJsonArrayAsStr = - s""" - |[ - | { - | 'id': 'executors', - | 'content': '
Executors
${EXECUTORS_LEGEND}', - | }, - | { - | 'id': 'jobs', - | 'content': '
Jobs
${JOBS_LEGEND}', - | } - |] - """.stripMargin - - val eventArrayAsStr = - (jobEventJsonAsStrSeq ++ executorEventJsonAsStrSeq).mkString("[", ",", "]") - - - - - Event Timeline - - ++ - ++ - - } - - private def jobsTable( - request: HttpServletRequest, - tableHeaderId: String, - jobTag: String, - jobs: Seq[v1.JobData], - killEnabled: Boolean): Seq[Node] = { - // stripXSS is called to remove suspicious characters used in XSS attacks - val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) => - UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq - } - val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) - .map(para => para._1 + "=" + para._2(0)) - - val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) - val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id" - - // stripXSS is called first to remove suspicious characters used in XSS attacks - val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page")) - val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort")) - val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc")) - val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize")) - val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize")) - - val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1) - val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn => - UIUtils.decodeURLParameter(sortColumn) - }.getOrElse(jobIdTitle) - val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( - // New jobs should be shown above old jobs by default. - jobSortColumn == jobIdTitle - ) - val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) - val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) - - val page: Int = { - // If the user has changed to a larger page size, then go to page 1 in order to avoid - // IndexOutOfBoundsException. - if (jobPageSize <= jobPrevPageSize) { - jobPage - } else { - 1 - } - } - val currentTime = System.currentTimeMillis() - - try { - new JobPagedTable( - store, - jobs, - tableHeaderId, - jobTag, - UIUtils.prependBaseUri(parent.basePath), - "jobs", // subPath - parameterOtherTable, - killEnabled, - currentTime, - jobIdTitle, - pageSize = jobPageSize, - sortColumn = jobSortColumn, - desc = jobSortDesc - ).table(page) - } catch { - case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => -
-

Error while rendering job table:

-
-            {Utils.exceptionString(e)}
-          
-
- } - } - def render(request: HttpServletRequest): Seq[Node] = { val appInfo = store.applicationInfo() val startTime = appInfo.attempts.head.startTime.getTime() @@ -292,11 +52,14 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We } val activeJobsTable = - jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled) + JobsUtils.jobsTable(store, parent.basePath, request, "active", + "activeJob", activeJobs, killEnabled = parent.killEnabled) val completedJobsTable = - jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false) + JobsUtils.jobsTable(store, parent.basePath, request, "completed", + "completedJob", completedJobs, killEnabled = false) val failedJobsTable = - jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false) + JobsUtils.jobsTable(store, parent.basePath, request, "failed", + "failedJob", failedJobs, killEnabled = false) val shouldShowActiveJobs = activeJobs.nonEmpty val shouldShowCompletedJobs = completedJobs.nonEmpty @@ -363,7 +126,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We var content = summary - content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs, + content ++= JobsUtils.makeTimeline(store, activeJobs ++ completedJobs ++ failedJobs, store.executorList(false), startTime) if (shouldShowActiveJobs) { @@ -386,247 +149,3 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We } } - -private[ui] class JobTableRowData( - val jobData: v1.JobData, - val lastStageName: String, - val lastStageDescription: String, - val duration: Long, - val formattedDuration: String, - val submissionTime: Long, - val formattedSubmissionTime: String, - val jobDescription: NodeSeq, - val detailUrl: String) - -private[ui] class JobDataSource( - store: AppStatusStore, - jobs: Seq[v1.JobData], - basePath: String, - currentTime: Long, - pageSize: Int, - sortColumn: String, - desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) { - - import ApiHelper._ - - // Convert JobUIData to JobTableRowData which contains the final contents to show in the table - // so that we can avoid creating duplicate contents during sorting the data - private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc)) - - private var _slicedJobIds: Set[Int] = null - - override def dataSize: Int = data.size - - override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = { - val r = data.slice(from, to) - _slicedJobIds = r.map(_.jobData.jobId).toSet - r - } - - private def jobRow(jobData: v1.JobData): JobTableRowData = { - val duration: Option[Long] = { - jobData.submissionTime.map { start => - val end = jobData.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis()) - end - start.getTime() - } - } - val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") - val submissionTime = jobData.submissionTime - val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") - val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData) - - val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false) - - val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId) - - new JobTableRowData( - jobData, - lastStageName, - lastStageDescription, - duration.getOrElse(-1), - formattedDuration, - submissionTime.map(_.getTime()).getOrElse(-1L), - formattedSubmissionTime, - jobDescription, - detailUrl - ) - } - - /** - * Return Ordering according to sortColumn and desc - */ - private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = { - val ordering: Ordering[JobTableRowData] = sortColumn match { - case "Job Id" | "Job Id (Job Group)" => Ordering.by(_.jobData.jobId) - case "Description" => Ordering.by(x => (x.lastStageDescription, x.lastStageName)) - case "Submitted" => Ordering.by(_.submissionTime) - case "Duration" => Ordering.by(_.duration) - case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" => - throw new IllegalArgumentException(s"Unsortable column: $sortColumn") - case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") - } - if (desc) { - ordering.reverse - } else { - ordering - } - } - -} - -private[ui] class JobPagedTable( - store: AppStatusStore, - data: Seq[v1.JobData], - tableHeaderId: String, - jobTag: String, - basePath: String, - subPath: String, - parameterOtherTable: Iterable[String], - killEnabled: Boolean, - currentTime: Long, - jobIdTitle: String, - pageSize: Int, - sortColumn: String, - desc: Boolean - ) extends PagedTable[JobTableRowData] { - val parameterPath = basePath + s"/$subPath/?" + parameterOtherTable.mkString("&") - - override def tableId: String = jobTag + "-table" - - override def tableCssClass: String = - "table table-bordered table-condensed table-striped " + - "table-head-clickable table-cell-width-limited" - - override def pageSizeFormField: String = jobTag + ".pageSize" - - override def prevPageSizeFormField: String = jobTag + ".prevPageSize" - - override def pageNumberFormField: String = jobTag + ".page" - - override val dataSource = new JobDataSource( - store, - data, - basePath, - currentTime, - pageSize, - sortColumn, - desc) - - override def pageLink(page: Int): String = { - val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") - parameterPath + - s"&$pageNumberFormField=$page" + - s"&$jobTag.sort=$encodedSortColumn" + - s"&$jobTag.desc=$desc" + - s"&$pageSizeFormField=$pageSize" + - s"#$tableHeaderId" - } - - override def goButtonFormPath: String = { - val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") - s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc#$tableHeaderId" - } - - override def headers: Seq[Node] = { - // Information for each header: title, cssClass, and sortable - val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] = - Seq( - (jobIdTitle, "", true), - ("Description", "", true), ("Submitted", "", true), ("Duration", "", true), - ("Stages: Succeeded/Total", "", false), - ("Tasks (for all stages): Succeeded/Total", "", false) - ) - - if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) { - throw new IllegalArgumentException(s"Unknown column: $sortColumn") - } - - val headerRow: Seq[Node] = { - jobHeadersAndCssClasses.map { case (header, cssClass, sortable) => - if (header == sortColumn) { - val headerLink = Unparsed( - parameterPath + - s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" + - s"&$jobTag.desc=${!desc}" + - s"&$jobTag.pageSize=$pageSize" + - s"#$tableHeaderId") - val arrow = if (desc) "▾" else "▴" // UP or DOWN - - - - {header} -  {Unparsed(arrow)} - - - - } else { - if (sortable) { - val headerLink = Unparsed( - parameterPath + - s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" + - s"&$jobTag.pageSize=$pageSize" + - s"#$tableHeaderId") - - - - {header} - - - } else { - - {header} - - } - } - } - } - {headerRow} - } - - override def row(jobTableRow: JobTableRowData): Seq[Node] = { - val job = jobTableRow.jobData - - val killLink = if (killEnabled) { - val confirm = - s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " + - "{ this.parentNode.submit(); return true; } else { return false; }" - // SPARK-6846 this should be POST-only but YARN AM won't proxy POST - /* - val killLinkUri = s"$basePathUri/jobs/job/kill/" -
- - (kill) -
- */ - val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}" - (kill) - } else { - Seq.empty - } - - - - {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} - - - {jobTableRow.jobDescription} {killLink} - {jobTableRow.lastStageName} - - - {jobTableRow.formattedSubmissionTime} - - {jobTableRow.formattedDuration} - - {job.numCompletedStages}/{job.stageIds.size - job.numSkippedStages} - {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} - {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} - - - {UIUtils.makeProgressBar(started = job.numActiveTasks, - completed = job.numCompletedIndices, - failed = job.numFailedTasks, skipped = job.numSkippedTasks, - reasonToNumKilled = job.killedTasksSummary, total = job.numTasks - job.numSkippedTasks)} - - - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala new file mode 100644 index 0000000000000..20165990685af --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable.ListBuffer +import scala.xml.{Node, NodeSeq} + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1 +import org.apache.spark.ui._ + +/** Page showing list of all ongoing and recently finished jobs belonging to a job group id */ +private[ui] class JobGroupPage(parent: JobsTab, store: AppStatusStore) + extends WebUIPage("jobgroup") { + + def render(request: HttpServletRequest): Seq[Node] = { + val parameterId = UIUtils.stripXSS(request.getParameter("id")) + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val jobGroupId = parameterId + + val activeJobs = new ListBuffer[v1.JobData]() + val completedJobs = new ListBuffer[v1.JobData]() + val failedJobs = new ListBuffer[v1.JobData]() + + var totalJobExecutionTime = 0L + store.jobsInJobGroupList(jobGroupId, null).foreach { job => + val duration: Option[Long] = { + job.submissionTime.map { start => + val end = job.completionTime.map(_.getTime).getOrElse(System.currentTimeMillis()) + end - start.getTime() + } + } + + totalJobExecutionTime += duration.getOrElse(0L) + + job.status match { + case JobExecutionStatus.SUCCEEDED => + completedJobs += job + case JobExecutionStatus.FAILED => + failedJobs += job + case _ => + activeJobs += job + } + } + + val activeJobsTable = + JobsUtils.jobsTable(store, parent.basePath, request, "active", + "activeJob", activeJobs, killEnabled = parent.killEnabled) + val completedJobsTable = + JobsUtils.jobsTable(store, parent.basePath, request, "completed", + "completedJob", completedJobs, killEnabled = false) + val failedJobsTable = + JobsUtils.jobsTable(store, parent.basePath, request, "failed", + "failedJob", failedJobs, killEnabled = false) + + val shouldShowActiveJobs = activeJobs.nonEmpty + val shouldShowCompletedJobs = completedJobs.nonEmpty + val shouldShowFailedJobs = failedJobs.nonEmpty + + val summary: NodeSeq = +
+ +
+ + var content = summary + val appStartTime = store.applicationInfo().attempts.head.startTime.getTime() + + content ++= JobsUtils.makeTimeline(store, activeJobs ++ completedJobs ++ failedJobs, + store.executorList(false), appStartTime) + + if (shouldShowActiveJobs) { + content ++=

Active Jobs ({activeJobs.size})

++ + activeJobsTable + } + if (shouldShowCompletedJobs) { + content ++=

Completed Jobs ({completedJobs.size})

++ + completedJobsTable + } + if (shouldShowFailedJobs) { + content ++=

Failed Jobs ({failedJobs.size})

++ + failedJobsTable + } + + val helpText = """A job is triggered by an action, like count() or saveAsTextFile().""" + + " Click on a job to see information about the stages of tasks inside it." + + UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText)) + } + +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPagedTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPagedTable.scala new file mode 100644 index 0000000000000..22b20abbf94cf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPagedTable.scala @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import java.net.URLEncoder + +import scala.xml.{Node, NodeSeq, Unparsed} + +import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1 +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils} + +private[ui] class JobTableRowData( + val jobData: v1.JobData, + val lastStageName: String, + val lastStageDescription: String, + val duration: Long, + val formattedDuration: String, + val submissionTime: Long, + val formattedSubmissionTime: String, + val jobDescription: NodeSeq, + val detailUrl: String) + +private[ui] class JobDataSource( + store: AppStatusStore, + jobs: Seq[v1.JobData], + basePath: String, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) { + + import ApiHelper._ + + // Convert JobUIData to JobTableRowData which contains the final contents to show in the table + // so that we can avoid creating duplicate contents during sorting the data + private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc)) + + private var _slicedJobIds: Set[Int] = null + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = { + val r = data.slice(from, to) + _slicedJobIds = r.map(_.jobData.jobId).toSet + r + } + + private def jobRow(jobData: v1.JobData): JobTableRowData = { + val duration: Option[Long] = { + jobData.submissionTime.map { start => + val end = jobData.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis()) + end - start.getTime() + } + } + val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") + val submissionTime = jobData.submissionTime + val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") + val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData) + + val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false) + + val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId) + + new JobTableRowData( + jobData, + lastStageName, + lastStageDescription, + duration.getOrElse(-1), + formattedDuration, + submissionTime.map(_.getTime()).getOrElse(-1L), + formattedSubmissionTime, + jobDescription, + detailUrl + ) + } + + /** + * Return Ordering according to sortColumn and desc + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = { + val ordering: Ordering[JobTableRowData] = sortColumn match { + case "Job Id" | "Job Id (Job Group)" => Ordering.by(_.jobData.jobId) + case "Description" => Ordering.by(x => (x.lastStageDescription, x.lastStageName)) + case "Submitted" => Ordering.by(_.submissionTime) + case "Duration" => Ordering.by(_.duration) + case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" => + throw new IllegalArgumentException(s"Unsortable column: $sortColumn") + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } + +} + + +private[ui] class JobPagedTable( + store: AppStatusStore, + data: Seq[v1.JobData], + tableHeaderId: String, + jobTag: String, + basePath: String, + subPath: String, + parameterOtherTable: Iterable[String], + killEnabled: Boolean, + currentTime: Long, + jobIdTitle: String, + pageSize: Int, + sortColumn: String, + desc: Boolean + ) extends PagedTable[JobTableRowData] { + val parameterPath = basePath + s"/$subPath/?" + parameterOtherTable.mkString("&") + + override def tableId: String = jobTag + "-table" + + override def tableCssClass: String = + "table table-bordered table-condensed table-striped " + + "table-head-clickable table-cell-width-limited" + + override def pageSizeFormField: String = jobTag + ".pageSize" + + override def prevPageSizeFormField: String = jobTag + ".prevPageSize" + + override def pageNumberFormField: String = jobTag + ".page" + + override val dataSource = new JobDataSource( + store, + data, + basePath, + currentTime, + pageSize, + sortColumn, + desc) + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$jobTag.sort=$encodedSortColumn" + + s"&$jobTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableHeaderId" + } + + override def goButtonFormPath: String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc#$tableHeaderId" + } + + override def headers: Seq[Node] = { + // Information for each header: title, cssClass, and sortable + val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] = + Seq( + (jobIdTitle, "", true), + ("Description", "", true), ("Submitted", "", true), ("Duration", "", true), + ("Stages: Succeeded/Total", "", false), + ("Tasks (for all stages): Succeeded/Total", "", false) + ) + + if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") + } + + val headerRow: Seq[Node] = { + jobHeadersAndCssClasses.map { case (header, cssClass, sortable) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$jobTag.desc=${!desc}" + + s"&$jobTag.pageSize=$pageSize" + + s"#$tableHeaderId") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + {header} +  {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$jobTag.pageSize=$pageSize" + + s"#$tableHeaderId") + + + + {header} + + + } else { + + {header} + + } + } + } + } + {headerRow} + } + + override def row(jobTableRow: JobTableRowData): Seq[Node] = { + val job = jobTableRow.jobData + + val killLink = if (killEnabled) { + val confirm = + s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " + + "{ this.parentNode.submit(); return true; } else { return false; }" + // SPARK-6846 this should be POST-only but YARN AM won't proxy POST + /* + val killLinkUri = s"$basePathUri/jobs/job/kill/" +
+ + (kill) +
+ */ + val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}" + (kill) + } else { + Seq.empty + } + + + + {job.jobId} { job.jobGroup.map { id => + + {id} + + }.getOrElse({job.jobGroup.map(id => s"($id)").getOrElse("")})} + + + {jobTableRow.jobDescription} {killLink} + {jobTableRow.lastStageName} + + + {jobTableRow.formattedSubmissionTime} + + {jobTableRow.formattedDuration} + + {job.numCompletedStages}/{job.stageIds.size - job.numSkippedStages} + {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} + {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} + + + {UIUtils.makeProgressBar(started = job.numActiveTasks, + completed = job.numCompletedIndices, + failed = job.numFailedTasks, skipped = job.numSkippedTasks, + reasonToNumKilled = job.killedTasksSummary, total = job.numTasks - job.numSkippedTasks)} + + + } +} + diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index ff1b75e5c5065..b47ec912e7261 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -44,6 +44,7 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) attachPage(new AllJobsPage(this, store)) attachPage(new JobPage(this, store)) + attachPage(new JobGroupPage(this, store)) def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala new file mode 100644 index 0000000000000..705089750d630 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ui.jobs + +import java.util.Date +import javax.servlet.http.HttpServletRequest + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.xml.{Node, Unparsed, Utility} + +import org.apache.commons.lang3.StringEscapeUtils + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.internal.Logging +import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1 +import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.util.Utils + +private object JobsUtils extends Logging{ + import ApiHelper._ + + val JOBS_LEGEND = +
+ + Succeeded + + Failed + + Running +
.toString.filter(_ != '\n') + + val EXECUTORS_LEGEND = +
+ + Added + + Removed +
.toString.filter(_ != '\n') + + private def makeJobEvent(store: AppStatusStore, jobs: Seq[v1.JobData]): Seq[String] = { + jobs.filter { job => + job.status != JobExecutionStatus.UNKNOWN && job.submissionTime.isDefined + }.map { job => + val jobId = job.jobId + val status = job.status + val (_, lastStageDescription) = lastStageNameAndDescription(store, job) + val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text + + val submissionTime = job.submissionTime.get.getTime() + val completionTime = job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis()) + val classNameByStatus = status match { + case JobExecutionStatus.SUCCEEDED => "succeeded" + case JobExecutionStatus.FAILED => "failed" + case JobExecutionStatus.RUNNING => "running" + case JobExecutionStatus.UNKNOWN => "unknown" + } + + // The timeline library treats contents as HTML, so we have to escape them. We need to add + // extra layers of escaping in order to embed this in a Javascript string literal. + val escapedDesc = Utility.escape(jobDescription) + val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc) + val jobEventJsonAsStr = + s""" + |{ + | 'className': 'job application-timeline-object ${classNameByStatus}', + | 'group': 'jobs', + | 'start': new Date(${submissionTime}), + | 'end': new Date(${completionTime}), + | 'content': '
Completed: ${UIUtils.formatDate(new Date(completionTime))}""" + } else { + "" + } + }">' + + | '${jsEscapedDesc} (Job ${jobId})
' + |} + """.stripMargin + jobEventJsonAsStr + } + } + + def makeExecutorEvent(executors: Seq[v1.ExecutorSummary]): + Seq[String] = { + val events = ListBuffer[String]() + executors.foreach { e => + val addedEvent = + s""" + |{ + | 'className': 'executor added', + | 'group': 'executors', + | 'start': new Date(${e.addTime.getTime()}), + | 'content': '
Executor ${e.id} added
' + |} + """.stripMargin + events += addedEvent + + e.removeTime.foreach { removeTime => + val removedEvent = + s""" + |{ + | 'className': 'executor removed', + | 'group': 'executors', + | 'start': new Date(${removeTime.getTime()}), + | 'content': '
Reason: ${reason.replace("\n", " ")}""" + }.getOrElse("") + }"' + + | 'data-html="true">Executor ${e.id} removed
' + |} + """.stripMargin + events += removedEvent + } + } + events.toSeq + } + + def makeTimeline( + store: AppStatusStore, + jobs: Seq[v1.JobData], + executors: Seq[v1.ExecutorSummary], + startTime: Long): Seq[Node] = { + + val jobEventJsonAsStrSeq = makeJobEvent(store, jobs) + val executorEventJsonAsStrSeq = makeExecutorEvent(executors) + + val groupJsonArrayAsStr = + s""" + |[ + | { + | 'id': 'executors', + | 'content': '
Executors
${EXECUTORS_LEGEND}', + | }, + | { + | 'id': 'jobs', + | 'content': '
Jobs
${JOBS_LEGEND}', + | } + |] + """.stripMargin + + val eventArrayAsStr = + (jobEventJsonAsStrSeq ++ executorEventJsonAsStrSeq).mkString("[", ",", "]") + + + + + Event Timeline + + ++ + ++ + + } + + def jobsTable( + store: AppStatusStore, + basePath: String, + request: HttpServletRequest, + tableHeaderId: String, + jobTag: String, + jobs: Seq[v1.JobData], + killEnabled: Boolean): Seq[Node] = { + // stripXSS is called to remove suspicious characters used in XSS attacks + val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) => + UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq + } + val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) + .map(para => para._1 + "=" + para._2(0)) + + val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) + val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id" + + // stripXSS is called first to remove suspicious characters used in XSS attacks + val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page")) + val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort")) + val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc")) + val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize")) + val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize")) + + val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1) + val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse(jobIdTitle) + val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( + // New jobs should be shown above old jobs by default. + jobSortColumn == jobIdTitle + ) + val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) + val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) + + val page: Int = { + // If the user has changed to a larger page size, then go to page 1 in order to avoid + // IndexOutOfBoundsException. + if (jobPageSize <= jobPrevPageSize) { + jobPage + } else { + 1 + } + } + val currentTime = System.currentTimeMillis() + + try { + new JobPagedTable( + store, + jobs, + tableHeaderId, + jobTag, + UIUtils.prependBaseUri(basePath), + "jobs", // subPath + parameterOtherTable, + killEnabled, + currentTime, + jobIdTitle, + pageSize = jobPageSize, + sortColumn = jobSortColumn, + desc = jobSortDesc + ).table(page) + } catch { + case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => +
+

Error while rendering job table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } +} diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 0f20eea735044..02bf199e1ff30 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -723,6 +723,48 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + test("jobs with job group id should show correct link to job group page on all jobs page") { + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + // Once at least one job has been run in a job group, then we should display the link to + // job group page + val jobGroupId = "my-job-group" + sc.setJobGroup(jobGroupId, "my-job-group-description") + // Create an RDD that involves multiple stages + // : + val rdd = + sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity) + // Run it twice; this will cause the second job to have two "phantom" stages that were + // mentioned in its job start event but which were never actually executed: + rdd.count() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + goToUi(ui, "/jobs") + findAll(cssSelector("tbody tr")).foreach { row => + val links = row.underlying.findElements(By.xpath(".//a")) + links.size should be (2) + links.get(0).getText().toLowerCase should include (jobGroupId) + links.get(0).getAttribute("href") should include regex ( + s"(?=.*jobgroup)(?=.*${jobGroupId})") + links.get(1).getText().toLowerCase should include ("count") + } + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + goToUi(ui, s"/jobs/jobgroup/?id=${jobGroupId}") + find(id("pending")) should be (None) + find(id("active")) should be (None) + find(id("failed")) should be (None) + find(id("completed")).get.text should be ("Completed Jobs (1)") + findAll(cssSelector("tbody tr")).foreach { row => + val links = row.underlying.findElements(By.xpath(".//a")) + links.size should be (2) + links.get(0).getText().toLowerCase should include (jobGroupId) + links.get(1).getText().toLowerCase should include ("count") + } + } + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) }