From d2cbcddab12a62f743fd78afc5d483f3ce161868 Mon Sep 17 00:00:00 2001 From: jtengyp Date: Mon, 8 May 2017 15:49:33 +0800 Subject: [PATCH] Update CartesianRDD.scala In compute, group each iterator to multiple groups, reducing repeatedly data fetching. --- core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 57108dcedcf0..88607d508de2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] - for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + val groupSize = 500; + for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize); + y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize); + i <- x; j <- y) yield (i, j) } override def getDependencies: Seq[Dependency[_]] = List(