Skip to content

Commit eaa7b15

Browse files
committed
update
1 parent 46036bf commit eaa7b15

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
3131
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
3232
private val outputStreams = new ArrayBuffer[DStream[_]]()
3333

34+
val inputStreamNameAndID = new ArrayBuffer[(String, Int)]()
35+
3436
var rememberDuration: Duration = null
3537
var checkpointInProgress = false
3638

3739
var zeroTime: Time = null
3840
var startTime: Time = null
3941
var batchDuration: Duration = null
42+
var numReceivers: Int = 0
4043

4144
def start(time: Time) {
4245
this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
4548
startTime = time
4649
outputStreams.foreach(_.initialize(zeroTime))
4750
outputStreams.foreach(_.remember(rememberDuration))
48-
outputStreams.foreach(_.validateAtStart)
51+
outputStreams.foreach(_.validateAtStart())
52+
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
53+
inputStreams.foreach(is => inputStreamNameAndID.+=((is.name, is.id)))
4954
inputStreams.par.foreach(_.start())
5055
}
5156
}
@@ -106,16 +111,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
106111
.toArray
107112
}
108113

109-
def getInputStreamName(streamId: Int): Option[String] = synchronized {
110-
inputStreams.find(_.id == streamId).map(_.name)
111-
}
114+
def getReceiverNumber: Int = numReceivers
115+
116+
def getInputStreamNameAndID: ArrayBuffer[(String, Int)] = inputStreamNameAndID
112117

113118
def generateJobs(time: Time): Seq[Job] = {
114119
logDebug("Generating jobs for time " + time)
115-
val jobs = getOutputStreams().flatMap { outputStream =>
116-
val jobOption = outputStream.generateJob(time)
117-
jobOption.foreach(_.setCallSite(outputStream.creationSite))
118-
jobOption
120+
val jobs = this.synchronized {
121+
outputStreams.flatMap { outputStream =>
122+
val jobOption = outputStream.generateJob(time)
123+
jobOption.foreach(_.setCallSite(outputStream.creationSite))
124+
jobOption
125+
}
119126
}
120127
logDebug("Generated " + jobs.length + " jobs for time " + time)
121128
jobs

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
169169
}
170170

171171
def numInactiveReceivers: Int = {
172-
ssc.graph.getReceiverInputStreams().length - numActiveReceivers
172+
ssc.graph.getReceiverNumber - numActiveReceivers
173173
}
174174

175175
def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
197197
}
198198

199199
def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
200-
completedBatchUIData.toSeq
200+
completedBatchUIData
201201
}
202202

203203
def streamName(streamId: Int): Option[String] = {
204-
ssc.graph.getInputStreamName(streamId)
204+
ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
205205
}
206206

207207
/**
208208
* Return all InputDStream Ids
209209
*/
210-
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
210+
def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
211211

212212
/**
213213
* Return all of the record rates for each InputDStream in each batch. The key of the return value

0 commit comments

Comments
 (0)