From 19e33b4f6e3e8534fa6f7b63de458a0ee11b14d6 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 12:42:38 -0700 Subject: [PATCH 1/6] Add tests for all public RDD APIs that take in closures Tests should fail as of this commit because the issue hasn't been fixed yet. --- .../apache/spark/util/ClosureCleaner.scala | 5 +- .../spark/util/ClosureCleanerSuite.scala | 128 +++++++++++++++++- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index e3f52f6ff1e6..fb2d873269fb 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -195,6 +195,9 @@ private[spark] object ClosureCleaner extends Logging { } } +private[spark] class ReturnStatementInClosureException + extends SparkException("Return statements aren't allowed in Spark closures") + private[spark] class ReturnStatementFinder extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, @@ -203,7 +206,7 @@ class ReturnStatementFinder extends ClassVisitor(ASM4) { new MethodVisitor(ASM4) { override def visitTypeInsn(op: Int, tp: String) { if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) { - throw new SparkException("Return statements aren't allowed in Spark closures") + throw new ReturnStatementInClosureException } } } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index c47162779bbb..5b83d8612f0a 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.util +import java.io.NotSerializableException + import org.scalatest.FunSuite import org.apache.spark.LocalSparkContext._ -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{TaskContext, SparkContext, SparkException} +import org.apache.spark.rdd.RDD class ClosureCleanerSuite extends FunSuite { test("closures inside an object") { @@ -52,17 +55,63 @@ class ClosureCleanerSuite extends FunSuite { } test("toplevel return statements in closures are identified at cleaning time") { - val ex = intercept[SparkException] { + intercept[ReturnStatementInClosureException] { TestObjectWithBogusReturns.run() } - - assert(ex.getMessage.contains("Return statements aren't allowed in Spark closures")) } test("return statements from named functions nested in closures don't raise exceptions") { val result = TestObjectWithNestedReturns.run() assert(result == 1) } + + test("user provided closures are actually cleaned") { + + // We use return statements as an indication that a closure is actually being cleaned + // We expect closure cleaner to find the return statements in the user provided closures + def expectCorrectException(body: => Unit): Unit = { + try { + body + } catch { + case rse: ReturnStatementInClosureException => // Success! + case e @ (_: NotSerializableException | _: SparkException) => + fail(s"Expected ReturnStatementInClosureException, but got $e.\n" + + "This means the closure provided by user is not actually cleaned.") + } + } + + withSpark(new SparkContext("local", "test")) { sc => + val rdd = sc.parallelize(1 to 10) + val pairRdd = rdd.map { i => (i, i) } + expectCorrectException { TestUserClosuresActuallyCleaned.testMap(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMap(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFilter(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testSortBy(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions4(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testForeach(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testForeachPartition(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testReduce(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testTreeReduce(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFold(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testAggregate(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testTreeAggregate(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testCombineByKey(pairRdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testAggregateByKey(pairRdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFoldByKey(pairRdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKey(pairRdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testMapValues(pairRdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapValues(pairRdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testForeachAsync(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testForeachPartitionAsync(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testRunJob1(sc) } + expectCorrectException { TestUserClosuresActuallyCleaned.testRunJob2(sc) } + } + } } // A non-serializable class we create in closures to make sure that we aren't @@ -180,3 +229,74 @@ class TestClassWithNesting(val y: Int) extends Serializable { } } } + +/** + * Test whether closures passed in through public APIs are actually cleaned. + * + * We put a return statement in each of these closures as a mechanism to detect whether the + * ClosureCleaner actually cleaned our closure. If it did, then it would throw an appropriate + * exception explicitly complaining about the return statement. Otherwise, we know the + * ClosureCleaner did not actually clean our closure, in which case we should fail the test. + */ +private object TestUserClosuresActuallyCleaned { + def testMap(rdd: RDD[Int]): Unit = { rdd.map { _ => return; 0 }.count() } + def testFlatMap(rdd: RDD[Int]): Unit = { rdd.flatMap { _ => return; Seq() }.count() } + def testFilter(rdd: RDD[Int]): Unit = { rdd.filter { _ => return; true }.count() } + def testSortBy(rdd: RDD[Int]): Unit = { rdd.sortBy { _ => return; 1 }.count() } + def testKeyBy(rdd: RDD[Int]): Unit = { rdd.keyBy { _ => return; 1 }.count() } + def testGroupBy(rdd: RDD[Int]): Unit = { rdd.groupBy { _ => return; 1 }.count() } + def testMapPartitions(rdd: RDD[Int]): Unit = { rdd.mapPartitions { it => return; it }.count() } + def testMapPartitionsWithIndex(rdd: RDD[Int]): Unit = { + rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() + } + def testZipPartitions2(rdd: RDD[Int]): Unit = { + rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count() + } + def testZipPartitions3(rdd: RDD[Int]): Unit = { + rdd.zipPartitions(rdd, rdd) { case (it1, it2, it3) => return; it1 }.count() + } + def testZipPartitions4(rdd: RDD[Int]): Unit = { + rdd.zipPartitions(rdd, rdd, rdd) { case (it1, it2, it3, it4) => return; it1 }.count() + } + def testForeach(rdd: RDD[Int]): Unit = { rdd.foreach { _ => return } } + def testForeachPartition(rdd: RDD[Int]): Unit = { rdd.foreachPartition { _ => return } } + def testReduce(rdd: RDD[Int]): Unit = { rdd.reduce { case (_, _) => return; 1 } } + def testTreeReduce(rdd: RDD[Int]): Unit = { rdd.treeReduce { case (_, _) => return; 1 } } + def testFold(rdd: RDD[Int]): Unit = { rdd.fold(0) { case (_, _) => return; 1 } } + def testAggregate(rdd: RDD[Int]): Unit = { + rdd.aggregate(0)({ case (_, _) => return; 1 }, { case (_, _) => return; 1 }) + } + def testTreeAggregate(rdd: RDD[Int]): Unit = { + rdd.treeAggregate(0)({ case (_, _) => return; 1 }, { case (_, _) => return; 1 }) + } + + // Test pair RDD functions + def testCombineByKey(rdd: RDD[(Int, Int)]): Unit = { + rdd.combineByKey( + { _ => return; 1 }: Int => Int, + { case (_, _) => return; 1 }: (Int, Int) => Int, + { case (_, _) => return; 1 }: (Int, Int) => Int + ).count() + } + def testAggregateByKey(rdd: RDD[(Int, Int)]): Unit = { + rdd.aggregateByKey(0)({ case (_, _) => return; 1 }, { case (_, _) => return; 1 }).count() + } + def testFoldByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.foldByKey(0) { case (_, _) => return; 1 } } + def testReduceByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.reduceByKey { case (_, _) => return; 1 } } + def testMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.mapValues { _ => return; 1 } } + def testFlatMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.flatMapValues { _ => return; Seq() } } + + // Test async RDD actions + def testForeachAsync(rdd: RDD[Int]): Unit = { rdd.foreachAsync { _ => return } } + def testForeachPartitionAsync(rdd: RDD[Int]): Unit = { rdd.foreachPartitionAsync { _ => return } } + + // Test SparkContext runJob + def testRunJob1(sc: SparkContext): Unit = { + val rdd = sc.parallelize(1 to 10, 10) + sc.runJob(rdd, { (ctx: TaskContext, iter: Iterator[Int]) => return; 1 } ) + } + def testRunJob2(sc: SparkContext): Unit = { + val rdd = sc.parallelize(1 to 10, 10) + sc.runJob(rdd, { iter: Iterator[Int] => return; 1 } ) + } +} From 9ac5f9b818713f8a7699f44994e4dbab2ebc9dba Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 12:51:23 -0700 Subject: [PATCH 2/6] Clean closures that are not currently cleaned Now the test added in the previous commit passes! --- .../scala/org/apache/spark/SparkContext.scala | 3 ++- .../org/apache/spark/rdd/AsyncRDDActions.scala | 3 ++- .../org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 17 ++++++++++++----- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d0cf2a8dd01c..cab244877857 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1632,7 +1632,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { - runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) + val cleanedFunc = clean(func) + runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 3406a7e97e36..c412837d4158 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -119,7 +119,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Applies a function f to each partition of this RDD. */ def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { - self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length), + val cleanedF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, cleanedF, Range(0, self.partitions.length), (index, data) => Unit, Unit) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 05351ba4ff76..d3a4e967035a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -131,7 +131,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) - combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner) + val cleanedSeqOp = self.context.clean(seqOp) + val cleanedCombOp = self.context.clean(combOp) + combineByKey[U]( + (v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, cleanedCombOp, partitioner) } /** @@ -177,7 +180,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) - combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner) + val cleanedFunc = self.context.clean(func) + combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d80d94a58834..366646619cde 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -637,8 +637,11 @@ abstract class RDD[T: ClassTag]( */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { - val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter) - new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) + val cleanedF = sc.clean(f) + new MapPartitionsRDD( + this, + (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), + preservesPartitioning) } /** @@ -650,8 +653,11 @@ abstract class RDD[T: ClassTag]( */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { - val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter) - new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) + val cleanedF = sc.clean(f) + new MapPartitionsRDD( + this, + (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), + preservesPartitioning) } /** @@ -1334,7 +1340,8 @@ abstract class RDD[T: ClassTag]( * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)] = { - map(x => (f(x), x)) + val cleanedF = sc.clean(f) + map(x => (cleanedF(x), x)) } /** A private method for tests, to look at the contents of each partition */ From 8ac3074282dcb222a442e0d649256806bcf1f0c2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 13:00:19 -0700 Subject: [PATCH 3/6] Prevent NPE in tests when CC is used outside of an app --- .../src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index fb2d873269fb..0009c5826c2b 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -161,7 +161,9 @@ private[spark] object ClosureCleaner extends Logging { private def ensureSerializable(func: AnyRef) { try { - SparkEnv.get.closureSerializer.newInstance().serialize(func) + if (SparkEnv.get != null) { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } } catch { case ex: Exception => throw new SparkException("Task not serializable", ex) } From e83699ef94a47b17ee4ea7fe07678b3b48633c89 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 13:18:24 -0700 Subject: [PATCH 4/6] Clean one more --- .../scala/org/apache/spark/SparkContext.scala | 3 ++- .../spark/util/ClosureCleanerSuite.scala | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cab244877857..0741bffaa5b1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1687,7 +1687,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val callSite = getCallSite logInfo("Starting job: " + callSite.shortForm) val start = System.nanoTime - val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, + val cleanedFunc = clean(func) + val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout, localProperties.get) logInfo( "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 5b83d8612f0a..252523cb2d78 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.FunSuite import org.apache.spark.LocalSparkContext._ import org.apache.spark.{TaskContext, SparkContext, SparkException} +import org.apache.spark.partial.CountEvaluator import org.apache.spark.rdd.RDD class ClosureCleanerSuite extends FunSuite { @@ -110,6 +111,8 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testForeachPartitionAsync(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testRunJob1(sc) } expectCorrectException { TestUserClosuresActuallyCleaned.testRunJob2(sc) } + expectCorrectException { TestUserClosuresActuallyCleaned.testRunApproximateJob(sc) } + expectCorrectException { TestUserClosuresActuallyCleaned.testSubmitJob(sc) } } } } @@ -299,4 +302,20 @@ private object TestUserClosuresActuallyCleaned { val rdd = sc.parallelize(1 to 10, 10) sc.runJob(rdd, { iter: Iterator[Int] => return; 1 } ) } + def testRunApproximateJob(sc: SparkContext): Unit = { + val rdd = sc.parallelize(1 to 10, 10) + val evaluator = new CountEvaluator(1, 0.5) + sc.runApproximateJob( + rdd, { (ctx: TaskContext, iter: Iterator[Int]) => return; 1L }, evaluator, 1000) + } + def testSubmitJob(sc: SparkContext): Unit = { + val rdd = sc.parallelize(1 to 10, 10) + sc.submitJob( + rdd, + { _ => return; 1 }: Iterator[Int] => Int, + Seq.empty, + { case (_, _) => return }: (Int, Int) => Unit, + { return } + ) + } } From 6498f4427667bf3e956e9e5903c80742a511111d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 17:33:38 -0700 Subject: [PATCH 5/6] Add missing test for groupBy --- .../test/scala/org/apache/spark/util/ClosureCleanerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 252523cb2d78..bc9c8b55bb97 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -88,6 +88,7 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMap(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testFilter(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testSortBy(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testGroupBy(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } From df3caa32625da23a10d1b4f7ca7b9ea595e751d2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 18:08:43 -0700 Subject: [PATCH 6/6] Address comments --- .../main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 3 +-- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index c412837d4158..3406a7e97e36 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -119,8 +119,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Applies a function f to each partition of this RDD. */ def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { - val cleanedF = self.context.clean(f) - self.context.submitJob[T, Unit, Unit](self, cleanedF, Range(0, self.partitions.length), + self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length), (index, data) => Unit, Unit) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d3a4e967035a..fe3da7adf419 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -131,10 +131,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) + // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) - val cleanedCombOp = self.context.clean(combOp) - combineByKey[U]( - (v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, cleanedCombOp, partitioner) + combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) } /**