Skip to content

Commit e83699e

Browse files
author
Andrew Or
committed
Clean one more
1 parent 8ac3074 commit e83699e

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1687,7 +1687,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16871687
val callSite = getCallSite
16881688
logInfo("Starting job: " + callSite.shortForm)
16891689
val start = System.nanoTime
1690-
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
1690+
val cleanedFunc = clean(func)
1691+
val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,
16911692
localProperties.get)
16921693
logInfo(
16931694
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.scalatest.FunSuite
2323

2424
import org.apache.spark.LocalSparkContext._
2525
import org.apache.spark.{TaskContext, SparkContext, SparkException}
26+
import org.apache.spark.partial.CountEvaluator
2627
import org.apache.spark.rdd.RDD
2728

2829
class ClosureCleanerSuite extends FunSuite {
@@ -110,6 +111,8 @@ class ClosureCleanerSuite extends FunSuite {
110111
expectCorrectException { TestUserClosuresActuallyCleaned.testForeachPartitionAsync(rdd) }
111112
expectCorrectException { TestUserClosuresActuallyCleaned.testRunJob1(sc) }
112113
expectCorrectException { TestUserClosuresActuallyCleaned.testRunJob2(sc) }
114+
expectCorrectException { TestUserClosuresActuallyCleaned.testRunApproximateJob(sc) }
115+
expectCorrectException { TestUserClosuresActuallyCleaned.testSubmitJob(sc) }
113116
}
114117
}
115118
}
@@ -299,4 +302,20 @@ private object TestUserClosuresActuallyCleaned {
299302
val rdd = sc.parallelize(1 to 10, 10)
300303
sc.runJob(rdd, { iter: Iterator[Int] => return; 1 } )
301304
}
305+
def testRunApproximateJob(sc: SparkContext): Unit = {
306+
val rdd = sc.parallelize(1 to 10, 10)
307+
val evaluator = new CountEvaluator(1, 0.5)
308+
sc.runApproximateJob(
309+
rdd, { (ctx: TaskContext, iter: Iterator[Int]) => return; 1L }, evaluator, 1000)
310+
}
311+
def testSubmitJob(sc: SparkContext): Unit = {
312+
val rdd = sc.parallelize(1 to 10, 10)
313+
sc.submitJob(
314+
rdd,
315+
{ _ => return; 1 }: Iterator[Int] => Int,
316+
Seq.empty,
317+
{ case (_, _) => return }: (Int, Int) => Unit,
318+
{ return }
319+
)
320+
}
302321
}

0 commit comments

Comments
 (0)