From d089b2fc4c5a9ad508f6517ed42be6b5c4dc0549 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 6 Apr 2016 13:55:19 -0700 Subject: [PATCH 1/7] Changed RDD.treeAggregate to use fold instead of reduce --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 63a87e7f09d8..402635222916 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1120,7 +1120,7 @@ abstract class RDD[T: ClassTag]( * Aggregates the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) - * @see [[org.apache.spark.rdd.RDD#aggregate]] + * @see [[org.apache.spark.rdd.RDD#aggregate]] These two methods have identical semantics. */ def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, @@ -1146,9 +1146,9 @@ abstract class RDD[T: ClassTag]( val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) - }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values + }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - partiallyAggregated.reduce(cleanCombOp) + partiallyAggregated.fold(zeroValue)(cleanCombOp) } } From 246070eb16edbc1ebd77ec41acf0332e896e6287 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 16:58:38 -0700 Subject: [PATCH 2/7] Still testing treeAggregate implementations --- .../main/scala/org/apache/spark/rdd/RDD.scala | 10 +++++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 30 ++++++++++++++++++- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 402635222916..bfd11aa6a7eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1134,7 +1134,7 @@ abstract class RDD[T: ClassTag]( val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) + var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce @@ -1146,9 +1146,13 @@ abstract class RDD[T: ClassTag]( val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) - }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values + }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values + // This fails: + // .foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - partiallyAggregated.fold(zeroValue)(cleanCombOp) + partiallyAggregated.reduce(cleanCombOp) + // This fails: + // partiallyAggregated.fold(zeroValue)(cleanCombOp) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 8d06f5468f4f..a41a0d297c75 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -192,6 +192,22 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(ser.serialize(union.partitions.head).limit() < 2000) } + test("fold") { + val rdd = sc.makeRDD(-1000 until 1000, 10) + def op: (Int, Int) => Int = (c: Int, x: Int) => c + x + val sum = rdd.fold(0)(op) + assert(sum === -1000) + } + + test("fold with op modifying first arg") { + val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x)) + def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) => + c(0) += x(0) + c + } + val sum = rdd.fold(Array(0))(op) + } + test("aggregate") { val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] @@ -218,7 +234,19 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2 for (depth <- 1 until 10) { val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth) - assert(sum === -1000L) + assert(sum === -1000) + } + } + + test("treeAggregate with ops modifying first args") { + val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x)) + def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) => + c(0) += x(0) + c + } + for (depth <- 1 until 10) { + val sum = rdd.treeAggregate(Array(0))(op, op, depth) + assert(sum === -1000) } } From 326e9cf365105228599197ce164c3d9796e8f12d Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 17:07:08 -0700 Subject: [PATCH 3/7] fixed bug in treeAgg test --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index a41a0d297c75..3bf433299f4a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -246,7 +246,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } for (depth <- 1 until 10) { val sum = rdd.treeAggregate(Array(0))(op, op, depth) - assert(sum === -1000) + assert(sum(0) === -1000) } } From c7c5501f6dffdd6b340b96b8d6115fc215500212 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 17:10:59 -0700 Subject: [PATCH 4/7] Fixed incorrect statement about failure --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bfd11aa6a7eb..a7574083b9e3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1146,13 +1146,11 @@ abstract class RDD[T: ClassTag]( val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) - }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values - // This fails: - // .foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values + }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - partiallyAggregated.reduce(cleanCombOp) + //partiallyAggregated.reduce(cleanCombOp) // This fails: - // partiallyAggregated.fold(zeroValue)(cleanCombOp) + partiallyAggregated.fold(zeroValue)(cleanCombOp) } } From 5121ff8cbccef6d276ef40785d79fd5eaf00ef98 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 7 Apr 2016 17:51:50 -0700 Subject: [PATCH 5/7] Fixed bug in treeAggregate using fold --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a7574083b9e3..2fb2630600d1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1148,9 +1148,8 @@ abstract class RDD[T: ClassTag]( (i, iter) => iter.map((i % curNumPartitions, _)) }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } - //partiallyAggregated.reduce(cleanCombOp) - // This fails: - partiallyAggregated.fold(zeroValue)(cleanCombOp) + val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) + partiallyAggregated.fold(copiedZeroValue)(cleanCombOp) } } From 9eda23e02adb78a23f2638f414d1eea3816248f2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 5 Jun 2017 13:05:35 +0900 Subject: [PATCH 6/7] Fix Javadoc8 error --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2fb2630600d1..2985c9011946 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1118,9 +1118,9 @@ abstract class RDD[T: ClassTag]( /** * Aggregates the elements of this RDD in a multi-level tree pattern. + * This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]]. * * @param depth suggested depth of the tree (default: 2) - * @see [[org.apache.spark.rdd.RDD#aggregate]] These two methods have identical semantics. */ def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, From 016d479df9a162b640032f31678abcfb668a078e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 5 Jun 2017 13:27:31 +0900 Subject: [PATCH 7/7] Add an comparison in the tests in RDDSuite --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 3bf433299f4a..386c0060f9c4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -206,6 +206,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { c } val sum = rdd.fold(Array(0))(op) + assert(sum(0) === -1000) } test("aggregate") {