Skip to content

Commit 77a69ae

Browse files
committed
Refactor codes as per TD's comments
1 parent 35ffd80 commit 77a69ae

File tree

3 files changed

+158
-101
lines changed

3 files changed

+158
-101
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,19 @@ class Job(val time: Time, func: () => _) {
4141
_result
4242
}
4343

44+
/**
45+
* @return the global unique id of this Job.
46+
*/
4447
def id: String = {
4548
if (!isSet) {
4649
throw new IllegalStateException("Cannot access id before calling setId")
4750
}
4851
_id
4952
}
5053

54+
/**
55+
* @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
56+
*/
5157
def outputOpId: Int = {
5258
if (!isSet) {
5359
throw new IllegalStateException("Cannot access number before calling setId")
@@ -60,7 +66,7 @@ class Job(val time: Time, func: () => _) {
6066
throw new IllegalStateException("Cannot call setOutputOpId more than once")
6167
}
6268
isSet = true
63-
_id = "streaming job " + time + "." + outputOpId
69+
_id = s"streaming job $time.$outputOpId"
6470
_outputOpId = outputOpId
6571
}
6672

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala

Lines changed: 135 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,24 @@ package org.apache.spark.streaming.ui
1919

2020
import javax.servlet.http.HttpServletRequest
2121

22+
import scala.collection.mutable.{ArrayBuffer, Map}
23+
import scala.xml.{NodeSeq, Node}
24+
2225
import org.apache.commons.lang3.StringEscapeUtils
26+
2327
import org.apache.spark.streaming.Time
28+
import org.apache.spark.streaming.scheduler.BatchInfo
2429
import org.apache.spark.ui.{UIUtils, WebUIPage}
25-
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId}
30+
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
2631
import org.apache.spark.ui.jobs.UIData.JobUIData
2732

28-
import scala.xml.{NodeSeq, Node}
33+
private[ui] case class BatchUIData(
34+
var batchInfo: BatchInfo = null,
35+
outputOpIdToSparkJobIds: Map[OutputOpId, ArrayBuffer[SparkJobId]] = Map()) {
36+
}
2937

30-
class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
31-
private val streaminglistener = parent.listener
38+
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
39+
private val streamingListener = parent.listener
3240
private val sparkListener = parent.ssc.sc.jobProgressListener
3341

3442
private def columns: Seq[Node] = {
@@ -42,75 +50,101 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
4250
<th>Error</th>
4351
}
4452

45-
private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = {
46-
val jobDurations = jobs.map(job => {
47-
job.submissionTime.map { start =>
48-
val end = job.completionTime.getOrElse(System.currentTimeMillis())
49-
end - start
50-
}
51-
})
52-
val formattedOutputOpDuration =
53-
if (jobDurations.exists(_ == None)) {
54-
// If any job does not finish, set "formattedOutputOpDuration" to "-"
55-
"-"
56-
} else {
57-
UIUtils.formatDuration(jobDurations.flatMap(x => x).sum)
58-
}
53+
/**
54+
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
55+
* one cell, we use "rowspan" for the first row of a output op.
56+
*/
57+
def generateJobRow(
58+
outputOpId: OutputOpId,
59+
formattedOutputOpDuration: String,
60+
numSparkJobRowsInOutputOp: Int,
61+
isFirstRow: Boolean,
62+
sparkJob: JobUIData): Seq[Node] = {
63+
val lastStageInfo = Option(sparkJob.stageIds)
64+
.filter(_.nonEmpty)
65+
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
66+
val lastStageData = lastStageInfo.flatMap { s =>
67+
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
68+
}
5969

60-
def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = {
61-
val lastStageInfo = Option(job.stageIds)
62-
.filter(_.nonEmpty)
63-
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
64-
val lastStageData = lastStageInfo.flatMap { s =>
65-
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
70+
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
71+
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
72+
val duration: Option[Long] = {
73+
sparkJob.submissionTime.map { start =>
74+
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
75+
end - start
6676
}
77+
}
78+
val lastFailureReason =
79+
sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
80+
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
81+
flatMap(info => info.failureReason).headOption.getOrElse("")
82+
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
83+
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
6784

68-
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
69-
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
70-
val duration: Option[Long] = {
71-
job.submissionTime.map { start =>
72-
val end = job.completionTime.getOrElse(System.currentTimeMillis())
73-
end - start
74-
}
75-
}
76-
val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
77-
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
78-
flatMap(info => info.failureReason).headOption.getOrElse("")
79-
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
80-
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}"
81-
<tr>
82-
{if(isFirstRow) {
83-
<td rowspan={jobs.size.toString}>{outputOpId}</td>
84-
<td rowspan={jobs.size.toString}>
85+
// In the first row, output op id and its information needs to be shown. In other rows, these
86+
// cells will be taken up due to "rowspan".
87+
val prefixCells =
88+
if (isFirstRow) {
89+
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
90+
<td rowspan={numSparkJobRowsInOutputOp.toString}>
8591
<span class="description-input" title={lastStageDescription}>
8692
{lastStageDescription}
8793
</span>{lastStageName}
8894
</td>
89-
<td rowspan={jobs.size.toString}>{formattedOutputOpDuration}</td>}
95+
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
96+
} else {
97+
Nil
98+
}
99+
100+
<tr>
101+
{prefixCells}
102+
<td sorttable_customkey={sparkJob.jobId.toString}>
103+
<a href={detailUrl}>
104+
{sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
105+
</a>
106+
</td>
107+
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
108+
{formattedDuration}
109+
</td>
110+
<td class="stage-progress-cell">
111+
{sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
112+
{if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
113+
{if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
114+
</td>
115+
<td class="progress-cell">
116+
{
117+
UIUtils.makeProgressBar(
118+
started = sparkJob.numActiveTasks,
119+
completed = sparkJob.numCompletedTasks,
120+
failed = sparkJob.numFailedTasks,
121+
skipped = sparkJob.numSkippedTasks,
122+
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
90123
}
91-
<td sorttable_customkey={job.jobId.toString}>
92-
<a href={detailUrl}>
93-
{job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")}
94-
</a>
95-
</td>
96-
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
97-
{formattedDuration}
98-
</td>
99-
<td class="stage-progress-cell">
100-
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
101-
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
102-
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
103-
</td>
104-
<td class="progress-cell">
105-
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
106-
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
107-
total = job.numTasks - job.numSkippedTasks)}
108-
</td>
109-
{failureReasonCell(lastFailureReason)}
110-
</tr>
111-
}
124+
</td>
125+
{failureReasonCell(lastFailureReason)}
126+
</tr>
127+
}
112128

113-
makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x))
129+
private def generateOutputOpIdRow(
130+
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
131+
val sparkjobDurations = sparkJobs.map(sparkJob => {
132+
sparkJob.submissionTime.map { start =>
133+
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
134+
end - start
135+
}
136+
})
137+
val formattedOutputOpDuration =
138+
if (sparkjobDurations.exists(_ == None)) {
139+
// If any job does not finish, set "formattedOutputOpDuration" to "-"
140+
"-"
141+
} else {
142+
UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
143+
}
144+
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
145+
sparkJobs.tail.map { sparkJob =>
146+
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
147+
}.flatMap(x => x)
114148
}
115149

116150
private def failureReasonCell(failureReason: String): Seq[Node] = {
@@ -138,34 +172,41 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
138172
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
139173
}
140174

141-
private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = {
142-
def getJobData(jobId: JobId): Option[JobUIData] = {
143-
sparkListener.activeJobs.get(jobId).orElse {
144-
sparkListener.completedJobs.find(_.jobId == jobId).orElse {
145-
sparkListener.failedJobs.find(_.jobId == jobId)
146-
}
175+
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
176+
sparkListener.activeJobs.get(sparkJobId).orElse {
177+
sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
178+
sparkListener.failedJobs.find(_.jobId == sparkJobId)
147179
}
148180
}
181+
}
149182

150-
// Group jobInfos by OutputOpId firstly, then sort them.
151-
// E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])]
152-
val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] =
153-
jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId
183+
/**
184+
* Generate the job table for the batch.
185+
*/
186+
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
187+
val outputOpIdWithSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])] = {
188+
batchUIData.outputOpIdToSparkJobIds.toSeq.sortBy(_._1). // sorted by OutputOpId
154189
map { case (outputOpId, jobs) =>
155-
(outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId
190+
(outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
191+
}
192+
}
156193
sparkListener.synchronized {
157-
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map {
158-
case (outputOpId, jobIds) =>
159-
// Filter out JobIds that don't exist in sparkListener
160-
(outputOpId, jobIds.flatMap(getJobData))
194+
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithSparkJobIds.map {
195+
case (outputOpId, sparkJobIds) =>
196+
// Filter out spark Job ids that don't exist in sparkListener
197+
(outputOpId, sparkJobIds.flatMap(getJobData))
161198
}
162199

163200
<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
164201
<thead>
165202
{columns}
166203
</thead>
167204
<tbody>
168-
{outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}}
205+
{
206+
outputOpIdWithJobs.map {
207+
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
208+
}
209+
}
169210
</tbody>
170211
</table>
171212
}
@@ -176,13 +217,11 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
176217
throw new IllegalArgumentException(s"Missing id parameter")
177218
}
178219
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
179-
val (batchInfo, jobInfos) = streaminglistener.synchronized {
180-
val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse {
181-
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
182-
}
183-
val _jobInfos = streaminglistener.getJobInfos(batchTime)
184-
(_batchInfo, _jobInfos)
220+
221+
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
222+
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
185223
}
224+
val batchInfo = batchUIData.batchInfo
186225

187226
val formattedSchedulingDelay =
188227
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
@@ -195,7 +234,7 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
195234
<ul class="unstyled">
196235
<li>
197236
<strong>Batch Duration: </strong>
198-
{UIUtils.formatDuration(streaminglistener.batchDuration)}
237+
{UIUtils.formatDuration(streamingListener.batchDuration)}
199238
</li>
200239
<li>
201240
<strong>Input data size: </strong>
@@ -216,9 +255,15 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
216255
</ul>
217256
</div>
218257

219-
val content = summary ++ jobInfos.map(jobsTable).getOrElse {
220-
<div>Cannot find any job for Batch {formattedBatchTime}</div>
221-
}
258+
val jobTable =
259+
if (batchUIData.outputOpIdToSparkJobIds.isEmpty) {
260+
<div>Cannot find any job for Batch {formattedBatchTime}.</div>
261+
} else {
262+
generateJobTable(batchUIData)
263+
}
264+
265+
val content = summary ++ jobTable
266+
222267
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
223268
}
224269
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ import org.apache.spark.util.Distribution
3434
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3535
extends StreamingListener with SparkListener {
3636

37-
import StreamingJobProgressListener._
38-
3937
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
4038
private val runningBatchInfos = new HashMap[Time, BatchInfo]
4139
private val completedBatchInfos = new Queue[BatchInfo]
@@ -45,7 +43,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
4543
private var totalProcessedRecords = 0L
4644
private val receiverInfos = new HashMap[Int, ReceiverInfo]
4745

48-
private val batchTimeToJobIds = new HashMap[Time, ArrayBuffer[(OutputOpId, JobId)]]
46+
private val batchTimeToBatchUIData = new HashMap[Time, BatchUIData]
4947

5048
val batchDuration = ssc.graph.batchDuration.milliseconds
5149

@@ -87,7 +85,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
8785
completedBatchInfos.enqueue(batchCompleted.batchInfo)
8886
if (completedBatchInfos.size > batchInfoLimit) {
8987
val removedBatch = completedBatchInfos.dequeue()
90-
batchTimeToJobIds.remove(removedBatch.batchTime)
88+
batchTimeToBatchUIData.remove(removedBatch.batchTime)
9189
}
9290
totalCompletedBatches += 1L
9391

@@ -97,8 +95,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
9795

9896
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
9997
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
100-
batchTimeToJobIds.getOrElseUpdate(batchTime, ArrayBuffer[(OutputOpId, JobId)]()) +=
101-
outputOpId -> jobStart.jobId
98+
val batchUIData = batchTimeToBatchUIData.getOrElseUpdate(batchTime, BatchUIData())
99+
// Because onJobStart and onBatchXXX messages are processed in different threads,
100+
// we may not be able to get the corresponding BatchInfo now. So here we only set
101+
// batchUIData.outputOpIdToSparkJobIds, batchUIData.batchInfo will be set in "getBatchUIData".
102+
batchUIData.outputOpIdToSparkJobIds.
103+
getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
102104
}
103105
}
104106

@@ -206,20 +208,24 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
206208
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
207209
}
208210

209-
def getBatchInfo(batchTime: Time): Option[BatchInfo] = synchronized {
211+
private def getBatchInfo(batchTime: Time): Option[BatchInfo] = {
210212
waitingBatchInfos.get(batchTime).orElse {
211213
runningBatchInfos.get(batchTime).orElse {
212214
completedBatchInfos.find(batch => batch.batchTime == batchTime)
213215
}
214216
}
215217
}
216218

217-
def getJobInfos(batchTime: Time): Option[Seq[(OutputOpId, JobId)]] = synchronized {
218-
batchTimeToJobIds.get(batchTime).map(_.toList)
219+
def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
220+
for (batchInfo <- getBatchInfo(batchTime)) yield {
221+
val batchUIData = batchTimeToBatchUIData.getOrElse(batchTime, BatchUIData(batchInfo))
222+
batchUIData.batchInfo = batchInfo
223+
batchUIData
224+
}
219225
}
220226
}
221227

222228
private[streaming] object StreamingJobProgressListener {
223-
type JobId = Int
229+
type SparkJobId = Int
224230
type OutputOpId = Int
225231
}

0 commit comments

Comments
 (0)