Skip to content

Commit b63c136

Browse files
committed
Fix compilation
1 parent a566d91 commit b63c136

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ private[streaming] class ReceiverSupervisorImpl(
195195
override def createBlockGenerator(
196196
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
197197
// Cleanup BlockGenerators that have already been stopped
198-
registeredBlockGenerators.removeAll(registeredBlockGenerators.asScala.filter{ _.isStopped() })
198+
val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
199+
stoppedGenerators.foreach(registeredBlockGenerators.remove(_))
199200

200201
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
201202
registeredBlockGenerators.add(newBlockGenerator)

0 commit comments

Comments
 (0)