Skip to content

Commit 9ac5f9b

Browse files
author
Andrew Or
committed
Clean closures that are not currently cleaned
Now the test added in the previous commit passes!
1 parent 19e33b4 commit 9ac5f9b

File tree

4 files changed

+22
-9
lines changed

4 files changed

+22
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1632,7 +1632,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16321632
partitions: Seq[Int],
16331633
allowLocal: Boolean
16341634
): Array[U] = {
1635-
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
1635+
val cleanedFunc = clean(func)
1636+
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
16361637
}
16371638

16381639
/**

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
119119
* Applies a function f to each partition of this RDD.
120120
*/
121121
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
122-
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
122+
val cleanedF = self.context.clean(f)
123+
self.context.submitJob[T, Unit, Unit](self, cleanedF, Range(0, self.partitions.length),
123124
(index, data) => Unit, Unit)
124125
}
125126
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
131131
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
132132
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
133133

134-
combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
134+
val cleanedSeqOp = self.context.clean(seqOp)
135+
val cleanedCombOp = self.context.clean(combOp)
136+
combineByKey[U](
137+
(v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, cleanedCombOp, partitioner)
135138
}
136139

137140
/**
@@ -177,7 +180,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
177180
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
178181
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
179182

180-
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
183+
val cleanedFunc = self.context.clean(func)
184+
combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
181185
}
182186

183187
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -637,8 +637,11 @@ abstract class RDD[T: ClassTag](
637637
*/
638638
def mapPartitions[U: ClassTag](
639639
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
640-
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
641-
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
640+
val cleanedF = sc.clean(f)
641+
new MapPartitionsRDD(
642+
this,
643+
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
644+
preservesPartitioning)
642645
}
643646

644647
/**
@@ -650,8 +653,11 @@ abstract class RDD[T: ClassTag](
650653
*/
651654
def mapPartitionsWithIndex[U: ClassTag](
652655
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
653-
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter)
654-
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
656+
val cleanedF = sc.clean(f)
657+
new MapPartitionsRDD(
658+
this,
659+
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
660+
preservesPartitioning)
655661
}
656662

657663
/**
@@ -1334,7 +1340,8 @@ abstract class RDD[T: ClassTag](
13341340
* Creates tuples of the elements in this RDD by applying `f`.
13351341
*/
13361342
def keyBy[K](f: T => K): RDD[(K, T)] = {
1337-
map(x => (f(x), x))
1343+
val cleanedF = sc.clean(f)
1344+
map(x => (cleanedF(x), x))
13381345
}
13391346

13401347
/** A private method for tests, to look at the contents of each partition */

0 commit comments

Comments
 (0)