From 93e3cb3b5127e27842f55870e25f876cb5e4ee1a Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 6 Apr 2016 13:55:19 -0700 Subject: [PATCH 1/5] Changed RDD.treeAggregate to use fold instead of reduce --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 032939b49a708..0c0c9e349aa1f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1061,7 +1061,7 @@ abstract class RDD[T: ClassTag]( * Aggregates the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) - * @see [[org.apache.spark.rdd.RDD#aggregate]] + * @see [[org.apache.spark.rdd.RDD#aggregate]] These two methods have identical semantics. */ def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, @@ -1087,9 +1087,9 @@ abstract class RDD[T: ClassTag]( val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) - }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values + }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - partiallyAggregated.reduce(cleanCombOp) + partiallyAggregated.fold(zeroValue)(cleanCombOp) } } From 0d05b96b528dd75af8f7cef605c488a1bc9554b0 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 16:58:38 -0700 Subject: [PATCH 2/5] Still testing treeAggregate implementations --- .../main/scala/org/apache/spark/rdd/RDD.scala | 10 +++++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 30 ++++++++++++++++++- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0c0c9e349aa1f..327473dbd4adb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1075,7 +1075,7 @@ abstract class RDD[T: ClassTag]( val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) + var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce @@ -1087,9 +1087,13 @@ abstract class RDD[T: ClassTag]( val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) - }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values + }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values + // This fails: + // .foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - partiallyAggregated.fold(zeroValue)(cleanCombOp) + partiallyAggregated.reduce(cleanCombOp) + // This fails: + // partiallyAggregated.fold(zeroValue)(cleanCombOp) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 24daedab2090f..6b29ffe17393b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -159,6 +159,22 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(ser.serialize(union.partitions.head).limit() < 2000) } + test("fold") { + val rdd = sc.makeRDD(-1000 until 1000, 10) + def op: (Int, Int) => Int = (c: Int, x: Int) => c + x + val sum = rdd.fold(0)(op) + assert(sum === -1000) + } + + test("fold with op modifying first arg") { + val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x)) + def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) => + c(0) += x(0) + c + } + val sum = rdd.fold(Array(0))(op) + } + test("aggregate") { val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] @@ -185,7 +201,19 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2 for (depth <- 1 until 10) { val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth) - assert(sum === -1000L) + assert(sum === -1000) + } + } + + test("treeAggregate with ops modifying first args") { + val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x)) + def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) => + c(0) += x(0) + c + } + for (depth <- 1 until 10) { + val sum = rdd.treeAggregate(Array(0))(op, op, depth) + assert(sum === -1000) } } From 7fcc10d30f87ac0f145a653ad25296cdf3d2f648 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 17:07:08 -0700 Subject: [PATCH 3/5] fixed bug in treeAgg test --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6b29ffe17393b..bff87b8eb663d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -213,7 +213,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } for (depth <- 1 until 10) { val sum = rdd.treeAggregate(Array(0))(op, op, depth) - assert(sum === -1000) + assert(sum(0) === -1000) } } From 16e79e3a596350bcd5e7d98a47270331fe10b8d8 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 17:10:59 -0700 Subject: [PATCH 4/5] Fixed incorrect statement about failure --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 327473dbd4adb..5e550f49d8da1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1087,13 +1087,11 @@ abstract class RDD[T: ClassTag]( val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) - }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values - // This fails: - // .foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values + }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - partiallyAggregated.reduce(cleanCombOp) + //partiallyAggregated.reduce(cleanCombOp) // This fails: - // partiallyAggregated.fold(zeroValue)(cleanCombOp) + partiallyAggregated.fold(zeroValue)(cleanCombOp) } } From 02d107aaa4935937075f422256619ccd88bc0cbd Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 17:51:50 -0700 Subject: [PATCH 5/5] Fixed bug in treeAggregate using fold --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5e550f49d8da1..71c733bafb6ff 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1089,9 +1089,8 @@ abstract class RDD[T: ClassTag]( (i, iter) => iter.map((i % curNumPartitions, _)) }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - //partiallyAggregated.reduce(cleanCombOp) - // This fails: - partiallyAggregated.fold(zeroValue)(cleanCombOp) + val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) + partiallyAggregated.fold(copiedZeroValue)(cleanCombOp) } }