@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
2121import java .util .concurrent .ConcurrentLinkedQueue
2222import java .util .concurrent .atomic .AtomicLong
2323
24+ import scala .collection .JavaConverters ._
2425import scala .collection .mutable
2526import scala .collection .mutable .ArrayBuffer
2627
@@ -84,7 +85,7 @@ private[streaming] class ReceiverSupervisorImpl(
8485 cleanupOldBlocks(threshTime)
8586 case UpdateRateLimit (eps) =>
8687 logInfo(s " Received a new rate limit: $eps. " )
87- registeredBlockGenerators.foreach { bg =>
88+ registeredBlockGenerators.asScala. foreach { bg =>
8889 bg.updateRate(eps)
8990 }
9091 }
@@ -170,11 +171,11 @@ private[streaming] class ReceiverSupervisorImpl(
170171 }
171172
172173 override protected def onStart () {
173- registeredBlockGenerators.foreach { _.start() }
174+ registeredBlockGenerators.asScala. foreach { _.start() }
174175 }
175176
176177 override protected def onStop (message : String , error : Option [Throwable ]) {
177- registeredBlockGenerators.foreach { _.stop() }
178+ registeredBlockGenerators.asScala. foreach { _.stop() }
178179 env.rpcEnv.stop(endpoint)
179180 }
180181
@@ -194,10 +195,10 @@ private[streaming] class ReceiverSupervisorImpl(
194195 override def createBlockGenerator (
195196 blockGeneratorListener : BlockGeneratorListener ): BlockGenerator = {
196197 // Cleanup BlockGenerators that have already been stopped
197- registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
198+ registeredBlockGenerators.removeAll( registeredBlockGenerators.asScala. filter{ _.isStopped() })
198199
199200 val newBlockGenerator = new BlockGenerator (blockGeneratorListener, streamId, env.conf)
200- registeredBlockGenerators += newBlockGenerator
201+ registeredBlockGenerators.add( newBlockGenerator)
201202 newBlockGenerator
202203 }
203204
0 commit comments