@@ -31,15 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
3131 private val inputStreams = new ArrayBuffer [InputDStream [_]]()
3232 private val outputStreams = new ArrayBuffer [DStream [_]]()
3333
34- val inputStreamNameAndID = new ArrayBuffer [(String , Int )]()
34+ @ volatile private var inputStreamNameAndID : Seq [(String , Int )] = Nil
3535
3636 var rememberDuration : Duration = null
3737 var checkpointInProgress = false
3838
3939 var zeroTime : Time = null
4040 var startTime : Time = null
4141 var batchDuration : Duration = null
42- var numReceivers : Int = 0
42+ @ volatile private var numReceivers : Int = 0
4343
4444 def start (time : Time ) {
4545 this .synchronized {
@@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
5050 outputStreams.foreach(_.remember(rememberDuration))
5151 outputStreams.foreach(_.validateAtStart())
5252 numReceivers = inputStreams.count(_.isInstanceOf [ReceiverInputDStream [_]])
53- inputStreams.foreach (is => inputStreamNameAndID. += (( is.name, is.id) ))
53+ inputStreamNameAndID = inputStreams.map (is => ( is.name, is.id))
5454 inputStreams.par.foreach(_.start())
5555 }
5656 }
@@ -111,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
111111 .toArray
112112 }
113113
114- def getReceiverNumber : Int = numReceivers
114+ def getNumReceivers : Int = numReceivers
115115
116- def getInputStreamNameAndID : ArrayBuffer [(String , Int )] = inputStreamNameAndID
116+ def getInputStreamNameAndID : Seq [(String , Int )] = inputStreamNameAndID
117117
118118 def generateJobs (time : Time ): Seq [Job ] = {
119119 logDebug(" Generating jobs for time " + time)
0 commit comments