Skip to content

Commit d44be2d

Browse files
author
Davies Liu
committed
fix bug
1 parent 1afb4f3 commit d44be2d

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import org.apache.spark.{Partition, Partitioner, TaskContext}
2424
/**
2525
* An RDD that applies a user provided function to every partition of the parent RDD, and
2626
* additionally allows the user to prepare each partition before computing the parent partition.
27+
*
28+
* TODO(davies): remove this once SPARK-10342 is fixed
2729
*/
2830
private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M: ClassTag](
2931
prev: RDD[T],
@@ -38,13 +40,24 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
3840

3941
override def getPartitions: Array[Partition] = firstParent[T].partitions
4042

41-
lazy val preparedArgument: M = preparePartition()
43+
private[this] var preparedArgument: Option[M] = None
44+
45+
def prepare(): Unit = {
46+
// This could be called multiple times
47+
if (preparedArgument.isEmpty) {
48+
preparedArgument = Some(preparePartition())
49+
}
50+
}
4251

4352
/**
4453
* Prepare a partition before computing it from its parent.
4554
*/
4655
override def compute(partition: Partition, context: TaskContext): Iterator[U] = {
47-
val prepared = preparedArgument
56+
prepare()
57+
// The same RDD could be called multiple times in one task, each call of compute() should
58+
// have sep
59+
val prepared = preparedArgument.get
60+
preparedArgument = None
4861
val parentIterator = firstParent[T].iterator(partition, context)
4962
executePartition(context, partition.index, prepared, parentIterator)
5063
}

core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
7676

7777
protected def tryPrepareChildren() {
7878
rdds.foreach {
79-
case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.preparedArgument
79+
case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
8080
case _ =>
8181
}
8282
}

0 commit comments

Comments
 (0)