Skip to content

Commit 8dd2575

Browse files
uncleGencmonkey
authored andcommitted
[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs
## What changes were proposed in this pull request? When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata). It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation. ## How was this patch tested? existing ut cc zsxwing Author: uncleGen <[email protected]> Closes apache#16601 from uncleGen/SPARK-19182.
1 parent 7c2da35 commit 8dd2575

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
3131
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
3232
private val outputStreams = new ArrayBuffer[DStream[_]]()
3333

34+
@volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
35+
3436
var rememberDuration: Duration = null
3537
var checkpointInProgress = false
3638

3739
var zeroTime: Time = null
3840
var startTime: Time = null
3941
var batchDuration: Duration = null
42+
@volatile private var numReceivers: Int = 0
4043

4144
def start(time: Time) {
4245
this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
4548
startTime = time
4649
outputStreams.foreach(_.initialize(zeroTime))
4750
outputStreams.foreach(_.remember(rememberDuration))
48-
outputStreams.foreach(_.validateAtStart)
51+
outputStreams.foreach(_.validateAtStart())
52+
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
53+
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
4954
inputStreams.par.foreach(_.start())
5055
}
5156
}
@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
106111
.toArray
107112
}
108113

109-
def getInputStreamName(streamId: Int): Option[String] = synchronized {
110-
inputStreams.find(_.id == streamId).map(_.name)
111-
}
114+
def getNumReceivers: Int = numReceivers
115+
116+
def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
112117

113118
def generateJobs(time: Time): Seq[Job] = {
114119
logDebug("Generating jobs for time " + time)

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
169169
}
170170

171171
def numInactiveReceivers: Int = {
172-
ssc.graph.getReceiverInputStreams().length - numActiveReceivers
172+
ssc.graph.getNumReceivers - numActiveReceivers
173173
}
174174

175175
def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
197197
}
198198

199199
def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
200-
completedBatchUIData.toSeq
200+
completedBatchUIData.toIndexedSeq
201201
}
202202

203203
def streamName(streamId: Int): Option[String] = {
204-
ssc.graph.getInputStreamName(streamId)
204+
ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
205205
}
206206

207207
/**
208208
* Return all InputDStream Ids
209209
*/
210-
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
210+
def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
211211

212212
/**
213213
* Return all of the record rates for each InputDStream in each batch. The key of the return value

0 commit comments

Comments
 (0)