Skip to content

Commit 57e9f29

Browse files
author
Andrew Or
committed
[SPARK-7318] [STREAMING] DStream cleans objects that are not closures
I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future. tdas Author: Andrew Or <[email protected]> Closes apache#5860 from andrewor14/streaming-closure-cleaner and squashes the following commits: 8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure 5ee4e25 [Andrew Or] Fix tests eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner 67eeff4 [Andrew Or] Add tests a4fa768 [Andrew Or] Clean the closure, not the RDD
1 parent 1fdabf8 commit 57e9f29

File tree

3 files changed

+9
-3
lines changed

3 files changed

+9
-3
lines changed

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ private[spark] object ClosureCleaner extends Logging {
179179
cleanTransitively: Boolean,
180180
accessedFields: Map[Class[_], Set[String]]): Unit = {
181181

182+
if (!isClosure(func.getClass)) {
183+
logWarning("Expected a closure; got " + func.getClass.getName)
184+
return
185+
}
186+
182187
// TODO: clean all inner closures first. This requires us to find the inner objects.
183188
// TODO: cache outerClasses / innerClasses / accessedFields
184189

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] (
553553
// because the DStream is reachable from the outer object here, and because
554554
// DStreams can't be serialized with closures, we can't proactively check
555555
// it for serializability and so we pass the optional false to SparkContext.clean
556-
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
556+
val cleanedF = context.sparkContext.clean(transformFunc, false)
557+
transform((r: RDD[T], t: Time) => cleanedF(r))
557558
}
558559

559560
/**

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
256256
}
257257

258258
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
259-
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
260-
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
259+
val receiver1 = new FakeReceiver(sendData = true)
260+
val receiver2 = new FakeReceiver(sendData = true)
261261
val receiverStream1 = ssc.receiverStream(receiver1)
262262
val receiverStream2 = ssc.receiverStream(receiver2)
263263
receiverStream1.register()

0 commit comments

Comments
 (0)