Skip to content

Commit 8e971d7

Browse files
author
Andrew Or
committed
Do not throw exception if object to clean is not closure
This breaks a valid use case where the user code passes in a case class into `map`. See ml.NormalizerSuite.
1 parent 5ee4e25 commit 8e971d7

File tree

3 files changed

+4
-15
lines changed

3 files changed

+4
-15
lines changed

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
180180
accessedFields: Map[Class[_], Set[String]]): Unit = {
181181

182182
if (!isClosure(func.getClass)) {
183-
throw new IllegalArgumentException("Expected a closure; got " + func.getClass.getName)
183+
logWarning("Expected a closure; got " + func.getClass.getName)
184+
return
184185
}
185186

186187
// TODO: clean all inner closures first. This requires us to find the inner objects.

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,6 @@ class ClosureCleanerSuite extends FunSuite {
6363
val result = TestObjectWithNestedReturns.run()
6464
assert(result === 1)
6565
}
66-
67-
test("should clean only closures") {
68-
withSpark(new SparkContext("local", "test")) { sc =>
69-
val rdd = sc.makeRDD(1 to 100)
70-
intercept[IllegalArgumentException] { sc.clean(new Integer(1)) }
71-
intercept[IllegalArgumentException] { sc.clean("not a closure") }
72-
intercept[IllegalArgumentException] { sc.clean(rdd) }
73-
sc.clean(() => new Integer(1))
74-
sc.clean(() => "part of a closure")
75-
sc.clean(() => rdd)
76-
}
77-
}
7866
}
7967

8068
// A non-serializable class we create in closures to make sure that we aren't

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
256256
}
257257

258258
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
259-
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
260-
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
259+
val receiver1 = new FakeReceiver(sendData = true)
260+
val receiver2 = new FakeReceiver(sendData = true)
261261
val receiverStream1 = ssc.receiverStream(receiver1)
262262
val receiverStream2 = ssc.receiverStream(receiver2)
263263
receiverStream1.register()

0 commit comments

Comments
 (0)