Skip to content

Commit df3caa3

Browse files
author
Andrew Or
committed
Address comments
1 parent 7a3cc80 commit df3caa3

File tree

2 files changed

+3
-5
lines changed

2 files changed

+3
-5
lines changed

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
119119
* Applies a function f to each partition of this RDD.
120120
*/
121121
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
122-
val cleanedF = self.context.clean(f)
123-
self.context.submitJob[T, Unit, Unit](self, cleanedF, Range(0, self.partitions.length),
122+
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
124123
(index, data) => Unit, Unit)
125124
}
126125
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
131131
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
132132
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
133133

134+
// We will clean the combiner closure later in `combineByKey`
134135
val cleanedSeqOp = self.context.clean(seqOp)
135-
val cleanedCombOp = self.context.clean(combOp)
136-
combineByKey[U](
137-
(v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, cleanedCombOp, partitioner)
136+
combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
138137
}
139138

140139
/**

0 commit comments

Comments
 (0)