Skip to content

Commit 1282b10

Browse files
committed
Handle some corner cases and add tests for StreamingJobProgressListener
1 parent 77a69ae commit 1282b10

File tree

4 files changed

+185
-27
lines changed

4 files changed

+185
-27
lines changed

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ import org.apache.spark.ui.{UIUtils, WebUIPage}
3030
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
3131
import org.apache.spark.ui.jobs.UIData.JobUIData
3232

33-
private[ui] case class BatchUIData(
34-
var batchInfo: BatchInfo = null,
35-
outputOpIdToSparkJobIds: Map[OutputOpId, ArrayBuffer[SparkJobId]] = Map()) {
36-
}
3733

3834
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
3935
private val streamingListener = parent.listener
@@ -184,18 +180,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
184180
* Generate the job table for the batch.
185181
*/
186182
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
187-
val outputOpIdWithSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])] = {
188-
batchUIData.outputOpIdToSparkJobIds.toSeq.sortBy(_._1). // sorted by OutputOpId
189-
map { case (outputOpId, jobs) =>
190-
(outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
191-
}
192-
}
193183
sparkListener.synchronized {
194-
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithSparkJobIds.map {
195-
case (outputOpId, sparkJobIds) =>
184+
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
185+
batchUIData.outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
196186
// Filter out spark Job ids that don't exist in sparkListener
197187
(outputOpId, sparkJobIds.flatMap(getJobData))
198-
}
188+
}
199189

200190
<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
201191
<thead>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.spark.streaming.ui
20+
21+
import org.apache.spark.streaming.scheduler.BatchInfo
22+
import org.apache.spark.streaming.ui.StreamingJobProgressListener._
23+
24+
/**
25+
* The data in outputOpIdToSparkJobIds are sorted by `OutputOpId` in ascending order, and for each
26+
* `OutputOpId`, the corresponding SparkJobId`s are sorted in ascending order.
27+
*/
28+
private[ui] case class BatchUIData(
29+
batchInfo: BatchInfo,
30+
outputOpIdToSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])]) {
31+
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.streaming.ui
1919

20+
import java.util.LinkedHashMap
21+
import java.util.{Map => JMap}
2022
import java.util.Properties
2123

2224
import scala.collection.mutable.{ArrayBuffer, Queue, HashMap}
@@ -34,6 +36,8 @@ import org.apache.spark.util.Distribution
3436
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3537
extends StreamingListener with SparkListener {
3638

39+
import StreamingJobProgressListener._
40+
3741
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
3842
private val runningBatchInfos = new HashMap[Time, BatchInfo]
3943
private val completedBatchInfos = new Queue[BatchInfo]
@@ -43,7 +47,28 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
4347
private var totalProcessedRecords = 0L
4448
private val receiverInfos = new HashMap[Int, ReceiverInfo]
4549

46-
private val batchTimeToBatchUIData = new HashMap[Time, BatchUIData]
50+
// Because onJobStart and onBatchXXX messages are processed in different threads,
51+
// we may not be able to get the corresponding BatchInfo when receiving onJobStart. So here we
52+
// cannot use a map of (Time, BatchUIData).
53+
private[ui] val batchTimeToOutputOpIdToSparkJobIds =
54+
new LinkedHashMap[Time, OutputOpIdToSparkJobIds] {
55+
override def removeEldestEntry(p1: JMap.Entry[Time, OutputOpIdToSparkJobIds]): Boolean = {
56+
// If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
57+
// SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
58+
// may add some information for a removed batch when processing "onJobStart". It will be a
59+
// memory leak.
60+
//
61+
// To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and
62+
// evict the eldest one.
63+
//
64+
// Note: if "onJobStart" happens before "onBatchSubmitted", the size of
65+
// "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained
66+
// batches temporarily, so here we use "10" to handle such case. This is not a perfect
67+
// solution, but at least it can handle most of cases.
68+
size() > waitingBatchInfos.size + runningBatchInfos.size + completedBatchInfos.size + 10
69+
}
70+
}
71+
4772

4873
val batchDuration = ssc.graph.batchDuration.milliseconds
4974

@@ -85,7 +110,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
85110
completedBatchInfos.enqueue(batchCompleted.batchInfo)
86111
if (completedBatchInfos.size > batchInfoLimit) {
87112
val removedBatch = completedBatchInfos.dequeue()
88-
batchTimeToBatchUIData.remove(removedBatch.batchTime)
113+
batchTimeToOutputOpIdToSparkJobIds.remove(removedBatch.batchTime)
89114
}
90115
totalCompletedBatches += 1L
91116

@@ -95,12 +120,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
95120

96121
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
97122
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
98-
val batchUIData = batchTimeToBatchUIData.getOrElseUpdate(batchTime, BatchUIData())
99-
// Because onJobStart and onBatchXXX messages are processed in different threads,
100-
// we may not be able to get the corresponding BatchInfo now. So here we only set
101-
// batchUIData.outputOpIdToSparkJobIds, batchUIData.batchInfo will be set in "getBatchUIData".
102-
batchUIData.outputOpIdToSparkJobIds.
103-
getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
123+
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdToSparkJobIds.get(batchTime)
124+
if (outputOpIdToSparkJobIds == null) {
125+
outputOpIdToSparkJobIds = new OutputOpIdToSparkJobIds()
126+
batchTimeToOutputOpIdToSparkJobIds.put(batchTime, outputOpIdToSparkJobIds)
127+
}
128+
outputOpIdToSparkJobIds.getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
104129
}
105130
}
106131

@@ -116,9 +141,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
116141
}
117142
}
118143

119-
def numReceivers: Int = synchronized {
120-
ssc.graph.getReceiverInputStreams().size
121-
}
144+
def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
122145

123146
def numTotalCompletedBatches: Long = synchronized {
124147
totalCompletedBatches
@@ -218,14 +241,20 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
218241

219242
def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
220243
for (batchInfo <- getBatchInfo(batchTime)) yield {
221-
val batchUIData = batchTimeToBatchUIData.getOrElse(batchTime, BatchUIData(batchInfo))
222-
batchUIData.batchInfo = batchInfo
223-
batchUIData
244+
// outputOpIdToSparkJobIds is a sorted copy of the original one so that the caller can feel
245+
// free to use the data in BatchUIData.
246+
val outputOpIdToSparkJobIds = Option(batchTimeToOutputOpIdToSparkJobIds.get(batchTime)).
247+
getOrElse(Map.empty).toSeq.sortWith(_._1 < _._1). // sorted by OutputOpId
248+
map { case (outputOpId, jobs) =>
249+
(outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
250+
}
251+
BatchUIData(batchInfo, outputOpIdToSparkJobIds)
224252
}
225253
}
226254
}
227255

228256
private[streaming] object StreamingJobProgressListener {
229257
type SparkJobId = Int
230258
type OutputOpId = Int
259+
private type OutputOpIdToSparkJobIds = HashMap[OutputOpId, ArrayBuffer[SparkJobId]]
231260
}

streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.streaming.ui
1919

20+
import java.util.Properties
21+
2022
import org.scalatest.Matchers
2123

24+
import org.apache.spark.scheduler.SparkListenerJobStart
2225
import org.apache.spark.streaming.dstream.DStream
2326
import org.apache.spark.streaming.scheduler._
2427
import org.apache.spark.streaming.{Duration, Time, Milliseconds, TestSuiteBase}
@@ -64,6 +67,48 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
6467
listener.numTotalProcessedRecords should be (0)
6568
listener.numTotalReceivedRecords should be (600)
6669

70+
// onJobStart
71+
val properties1 = new Properties()
72+
properties1.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
73+
properties1.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 0.toString)
74+
val jobStart1 = SparkListenerJobStart(jobId = 0,
75+
0L, // unused
76+
Nil, // unused
77+
properties1)
78+
listener.onJobStart(jobStart1)
79+
80+
val properties2 = new Properties()
81+
properties2.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
82+
properties2.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 0.toString)
83+
val jobStart2 = SparkListenerJobStart(jobId = 1,
84+
0L, // unused
85+
Nil, // unused
86+
properties2)
87+
listener.onJobStart(jobStart2)
88+
89+
val properties3 = new Properties()
90+
properties3.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
91+
properties3.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 1.toString)
92+
val jobStart3 = SparkListenerJobStart(jobId = 0,
93+
0L, // unused
94+
Nil, // unused
95+
properties3)
96+
listener.onJobStart(jobStart3)
97+
98+
val properties4 = new Properties()
99+
properties4.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
100+
properties4.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 1.toString)
101+
val jobStart4 = SparkListenerJobStart(jobId = 1,
102+
0L, // unused
103+
Nil, // unused
104+
properties4)
105+
listener.onJobStart(jobStart4)
106+
107+
val batchUIData = listener.getBatchUIData(Time(1000))
108+
assert(batchUIData != None)
109+
assert(batchUIData.get.batchInfo === batchInfoStarted)
110+
assert(batchUIData.get.outputOpIdToSparkJobIds === Seq(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
111+
67112
// onBatchCompleted
68113
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
69114
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
@@ -116,4 +161,67 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
116161
listener.retainedCompletedBatches.size should be (limit)
117162
listener.numTotalCompletedBatches should be(limit + 10)
118163
}
164+
165+
test("disorder onJobStart and onBatchXXX") {
166+
val ssc = setupStreams(input, operation)
167+
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
168+
val listener = new StreamingJobProgressListener(ssc)
169+
170+
// fulfill completedBatchInfos
171+
for(i <- 0 until limit) {
172+
val batchInfoCompleted =
173+
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
174+
val properties = new Properties()
175+
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + i * 100).toString)
176+
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
177+
val jobStart = SparkListenerJobStart(jobId = 1,
178+
0L, // unused
179+
Nil, // unused
180+
properties)
181+
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
182+
listener.onJobStart(jobStart)
183+
}
184+
185+
// onJobStart happens before onBatchSubmitted
186+
val properties = new Properties()
187+
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + limit * 100).toString)
188+
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
189+
val jobStart = SparkListenerJobStart(jobId = 0,
190+
0L, // unused
191+
Nil, // unused
192+
properties)
193+
listener.onJobStart(jobStart)
194+
195+
val batchInfoSubmitted =
196+
BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
197+
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
198+
199+
// We still can see the info retrieved from onJobStart
200+
listener.getBatchUIData(Time(1000 + limit * 100)) should be
201+
Some(BatchUIData(batchInfoSubmitted, Seq((0, Seq(0)))))
202+
203+
204+
// A lot of "onBatchCompleted"s happen before "onJobStart"
205+
for(i <- limit + 1 to limit * 2) {
206+
val batchInfoCompleted =
207+
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
208+
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
209+
}
210+
211+
for(i <- limit + 1 to limit * 2) {
212+
val properties = new Properties()
213+
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + i * 100).toString)
214+
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
215+
val jobStart = SparkListenerJobStart(jobId = 1,
216+
0L, // unused
217+
Nil, // unused
218+
properties)
219+
listener.onJobStart(jobStart)
220+
}
221+
222+
// We should not leak memory
223+
listener.batchTimeToOutputOpIdToSparkJobIds.size() should be <=
224+
(listener.waitingBatches.size + listener.runningBatches.size +
225+
listener.retainedCompletedBatches.size + 10)
226+
}
119227
}

0 commit comments

Comments
 (0)