Skip to content

Commit b2d8c02

Browse files
srowenrxin
authored andcommitted
SPARK-6044 [CORE] RDD.aggregate() should not use the closure serializer on the zero value
Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer. Compare with https://github.com/apache/spark/blob/e60ad2f4c47b011be7a3198689ac2b82ee317d96/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L127 Author: Sean Owen <[email protected]> Closes #5028 from srowen/SPARK-6044 and squashes the following commits: a4040a7 [Sean Owen] Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer
1 parent b3e6eca commit b2d8c02

File tree

1 file changed

+1
-1
lines changed
  • core/src/main/scala/org/apache/spark/rdd

1 file changed

+1
-1
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,7 @@ abstract class RDD[T: ClassTag](
960960
*/
961961
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
962962
// Clone the zero value since we will also be serializing it as part of tasks
963-
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
963+
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
964964
val cleanSeqOp = sc.clean(seqOp)
965965
val cleanCombOp = sc.clean(combOp)
966966
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

0 commit comments

Comments
 (0)