From fe42a5e8f5d002d22bd53a4cbcb81607efa10ab1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 17 Jun 2014 01:16:01 -0700 Subject: [PATCH 1/9] add treeAggregate --- .../main/scala/org/apache/spark/rdd/RDD.scala | 24 +++++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 10 ++++++++ 2 files changed, 34 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 27cc60d775788..a963f34929108 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -862,6 +862,30 @@ abstract class RDD[T: ClassTag]( jobResult } + @DeveloperApi + def treeAggregate[U: ClassTag](zeroValue: U)( + seqOp: (U, T) => U, + combOp: (U, U) => U, + level: Int): U = { + require(level >= 1, s"Level must be greater than 1 but got $level.") + if (this.partitions.size == 0) { + return Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) + } + val cleanSeqOp = sc.clean(seqOp) + val cleanCombOp = sc.clean(combOp) + val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) + var local = this.mapPartitions(it => Iterator(aggregatePartition(it))) + var numPartitions = local.partitions.size + val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / level)).toInt, 2) + while (numPartitions > scale + numPartitions / scale) { + numPartitions /= scale + local = local.mapPartitionsWithIndex { (i, iter) => + iter.map((i % numPartitions, _)) + }.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values + } + local.reduce(cleanCombOp) + } + /** * Return the number of elements in the RDD. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e94a1e76d410c..28ad63c998e41 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -769,4 +769,14 @@ class RDDSuite extends FunSuite with SharedSparkContext { mutableDependencies += dep } } + + test("treeAggregate") { + val rdd = sc.makeRDD(-1000 until 1000, 10) + def seqOp = (c: Long, x: Int) => c + x + def combOp = (c1: Long, c2: Long) => c1 + c2 + for (level <- 1 until 10) { + val sum = rdd.treeAggregate(0L)(seqOp, combOp, level) + assert(sum === -1000L) + } + } } From eb71c330973fe3392a08882788553fcba28e7541 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 17 Jun 2014 01:40:03 -0700 Subject: [PATCH 2/9] add treeReduce --- .../main/scala/org/apache/spark/rdd/RDD.scala | 26 +++++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 8 ++++++ 2 files changed, 34 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a963f34929108..dbde4a8db0011 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -827,6 +827,32 @@ abstract class RDD[T: ClassTag]( jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } + def treeReduce(f: (T, T) => T, level: Int): T = { + require(level >= 1, s"Level must be greater than 1 but got $level.") + val cleanF = sc.clean(f) + val reducePartition: Iterator[T] => Option[T] = iter => { + if (iter.hasNext) { + Some(iter.reduceLeft(cleanF)) + } else { + None + } + } + val local = this.mapPartitions(it => Iterator(reducePartition(it))) + val op: (Option[T], Option[T]) => Option[T] = (c, x) => { + if (c.isDefined && x.isDefined) { + Some(cleanF(c.get, x.get)) + } else if (c.isDefined) { + c + } else if (x.isDefined) { + x + } else { + None + } + } + local.treeAggregate(Option.empty[T])(op, op, level) + .getOrElse(throw new UnsupportedOperationException("empty collection")) + } + /** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 28ad63c998e41..db46691990ce3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -779,4 +779,12 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sum === -1000L) } } + + test("treeReduce") { + val rdd = sc.makeRDD(-1000 until 1000, 10) + for (level <- 1 until 10) { + val sum = rdd.treeReduce(_ + _, level) + assert(sum === -1000) + } + } } From 0f944908cb4b5ce8b91456d103d913bfbf764687 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 17 Jun 2014 01:52:20 -0700 Subject: [PATCH 3/9] add docs --- .../main/scala/org/apache/spark/rdd/RDD.scala | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index dbde4a8db0011..be1ac1fc57049 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -827,8 +827,15 @@ abstract class RDD[T: ClassTag]( jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } - def treeReduce(f: (T, T) => T, level: Int): T = { - require(level >= 1, s"Level must be greater than 1 but got $level.") + /** + * :: DeveloperApi :: + * Reduces the elements of this RDD in a tree pattern. + * @param depth suggested depth of the tree + * @see [[org.apache.spark.rdd.RDD#reduce]] + */ + @DeveloperApi + def treeReduce(f: (T, T) => T, depth: Int): T = { + require(depth >= 1, s"Depth must be greater than 1 but got $depth.") val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { @@ -849,7 +856,7 @@ abstract class RDD[T: ClassTag]( None } } - local.treeAggregate(Option.empty[T])(op, op, level) + local.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException("empty collection")) } @@ -888,12 +895,18 @@ abstract class RDD[T: ClassTag]( jobResult } + /** + * :: DeveloperApi :: + * Aggregates the elements of this RDD in a tree pattern. + * @param depth suggested depth of the tree + * @see [[org.apache.spark.rdd.RDD#aggregate]] + */ @DeveloperApi def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, - level: Int): U = { - require(level >= 1, s"Level must be greater than 1 but got $level.") + depth: Int): U = { + require(depth >= 1, s"Depth must be greater than 1 but got $depth.") if (this.partitions.size == 0) { return Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) } @@ -902,7 +915,7 @@ abstract class RDD[T: ClassTag]( val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var local = this.mapPartitions(it => Iterator(aggregatePartition(it))) var numPartitions = local.partitions.size - val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / level)).toInt, 2) + val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) while (numPartitions > scale + numPartitions / scale) { numPartitions /= scale local = local.mapPartitionsWithIndex { (i, iter) => From be6a88a9ddebb26111b2df339f8e2217eec73033 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 17 Jun 2014 02:08:46 -0700 Subject: [PATCH 4/9] use treeAggregate in mllib --- .../mllib/linalg/distributed/RowMatrix.scala | 21 +++++++++---------- .../mllib/optimization/GradientDescent.scala | 4 ++-- .../spark/mllib/optimization/LBFGS.scala | 4 ++-- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 00d0b18c27a8d..af1258f5e7d50 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -208,13 +208,11 @@ class RowMatrix( val nt: Int = n * (n + 1) / 2 // Compute the upper triangular part of the gram matrix. - val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))( + val GU = rows.treeAggregate(new BDV[Double](new Array[Double](nt)))( seqOp = (U, v) => { RowMatrix.dspr(1.0, v, U.data) U - }, - combOp = (U1, U2) => U1 += U2 - ) + }, combOp = (U1, U2) => U1 += U2, 2) RowMatrix.triuToFull(n, GU.data) } @@ -309,10 +307,11 @@ class RowMatrix( s"We need at least $mem bytes of memory.") } - val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))( + val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))( seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze), - combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2) - ) + combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => + (s1._1 + s2._1, s1._2 += s2._2), + 2) updateNumRows(m) @@ -371,10 +370,10 @@ class RowMatrix( */ def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = { val zeroValue = new ColumnStatisticsAggregator(numCols().toInt) - val summary = rows.map(_.toBreeze).aggregate[ColumnStatisticsAggregator](zeroValue)( - (aggregator, data) => aggregator.add(data), - (aggregator1, aggregator2) => aggregator1.merge(aggregator2) - ) + val summary = rows.map(_.toBreeze).treeAggregate[ColumnStatisticsAggregator](zeroValue)( + seqOp = (aggregator, data) => aggregator.add(data), + combOp = (aggregator1, aggregator2) => aggregator1.merge(aggregator2), + 2) updateNumRows(summary.count) summary } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 7030eeabe400a..e146044cd81a8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -175,14 +175,14 @@ object GradientDescent extends Logging { // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) - .aggregate((BDV.zeros[Double](weights.size), 0.0))( + .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => (grad1 += grad2, loss1 + loss2) - }) + }, 2) /** * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 8f187c9df5102..937d83ad73b0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -198,7 +198,7 @@ object LBFGS extends Logging { val localData = data val localGradient = gradient - val (gradientSum, lossSum) = localData.aggregate((BDV.zeros[Double](weights.size), 0.0))( + val (gradientSum, lossSum) = localData.treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = localGradient.compute( features, label, Vectors.fromBreeze(weights), Vectors.fromBreeze(grad)) @@ -206,7 +206,7 @@ object LBFGS extends Logging { }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => (grad1 += grad2, loss1 + loss2) - }) + }, 2) /** * regVal is sum of weight squares if it's L2 updater; From d58a087a88026a30876480cb6f060934728c58f5 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 1 Jul 2014 10:54:30 -0700 Subject: [PATCH 5/9] move treeReduce and treeAggregate to mllib --- .../main/scala/org/apache/spark/rdd/RDD.scala | 63 ------------------- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 ------ .../mllib/linalg/distributed/RowMatrix.scala | 1 + .../mllib/optimization/GradientDescent.scala | 1 + .../spark/mllib/optimization/LBFGS.scala | 1 + .../apache/spark/mllib/rdd/RDDFunctions.scala | 62 ++++++++++++++++++ .../spark/mllib/rdd/RDDFunctionsSuite.scala | 18 ++++++ 7 files changed, 83 insertions(+), 81 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 29bbd42b97d7d..4e841bc992bff 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -839,39 +839,6 @@ abstract class RDD[T: ClassTag]( jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } - /** - * :: DeveloperApi :: - * Reduces the elements of this RDD in a tree pattern. - * @param depth suggested depth of the tree - * @see [[org.apache.spark.rdd.RDD#reduce]] - */ - @DeveloperApi - def treeReduce(f: (T, T) => T, depth: Int): T = { - require(depth >= 1, s"Depth must be greater than 1 but got $depth.") - val cleanF = sc.clean(f) - val reducePartition: Iterator[T] => Option[T] = iter => { - if (iter.hasNext) { - Some(iter.reduceLeft(cleanF)) - } else { - None - } - } - val local = this.mapPartitions(it => Iterator(reducePartition(it))) - val op: (Option[T], Option[T]) => Option[T] = (c, x) => { - if (c.isDefined && x.isDefined) { - Some(cleanF(c.get, x.get)) - } else if (c.isDefined) { - c - } else if (x.isDefined) { - x - } else { - None - } - } - local.treeAggregate(Option.empty[T])(op, op, depth) - .getOrElse(throw new UnsupportedOperationException("empty collection")) - } - /** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to @@ -907,36 +874,6 @@ abstract class RDD[T: ClassTag]( jobResult } - /** - * :: DeveloperApi :: - * Aggregates the elements of this RDD in a tree pattern. - * @param depth suggested depth of the tree - * @see [[org.apache.spark.rdd.RDD#aggregate]] - */ - @DeveloperApi - def treeAggregate[U: ClassTag](zeroValue: U)( - seqOp: (U, T) => U, - combOp: (U, U) => U, - depth: Int): U = { - require(depth >= 1, s"Depth must be greater than 1 but got $depth.") - if (this.partitions.size == 0) { - return Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) - } - val cleanSeqOp = sc.clean(seqOp) - val cleanCombOp = sc.clean(combOp) - val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - var local = this.mapPartitions(it => Iterator(aggregatePartition(it))) - var numPartitions = local.partitions.size - val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) - while (numPartitions > scale + numPartitions / scale) { - numPartitions /= scale - local = local.mapPartitionsWithIndex { (i, iter) => - iter.map((i % numPartitions, _)) - }.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values - } - local.reduce(cleanCombOp) - } - /** * Return the number of elements in the RDD. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index d53dcd750923c..0f9cbe213ea17 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -820,22 +820,4 @@ class RDDSuite extends FunSuite with SharedSparkContext { mutableDependencies += dep } } - - test("treeAggregate") { - val rdd = sc.makeRDD(-1000 until 1000, 10) - def seqOp = (c: Long, x: Int) => c + x - def combOp = (c1: Long, c2: Long) => c1 + c2 - for (level <- 1 until 10) { - val sum = rdd.treeAggregate(0L)(seqOp, combOp, level) - assert(sum === -1000L) - } - } - - test("treeReduce") { - val rdd = sc.makeRDD(-1000 until 1000, 10) - for (level <- 1 until 10) { - val sum = rdd.treeReduce(_ + _, level) - assert(sum === -1000) - } - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 12986cc746eb1..dad6c358b2509 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -27,6 +27,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD import org.apache.spark.Logging +import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.MultivariateStatisticalSummary /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index e146044cd81a8..69fe7a3e0c892 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -25,6 +25,7 @@ import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.rdd.RDDFunctions._ /** * Class used to solve an optimization problem using Gradient Descent. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 18f6e62860007..22b9ee833d838 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -26,6 +26,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.rdd.RDDFunctions._ /** * :: DeveloperApi :: diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index 365b5e75d7f75..8bc688b94824b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -20,7 +20,10 @@ package org.apache.spark.mllib.rdd import scala.language.implicitConversions import scala.reflect.ClassTag +import org.apache.spark.HashPartitioner +import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils /** * Machine learning specific RDD functions. @@ -44,6 +47,65 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { new SlidingRDD[T](self, windowSize) } } + + /** + * Reduces the elements of this RDD in a tree pattern. + * @param depth suggested depth of the tree + * @see [[org.apache.spark.rdd.RDD#reduce]] + */ + def treeReduce(f: (T, T) => T, depth: Int): T = { + require(depth >= 1, s"Depth must be greater than 1 but got $depth.") + val cleanF = self.context.clean(f) + val reducePartition: Iterator[T] => Option[T] = iter => { + if (iter.hasNext) { + Some(iter.reduceLeft(cleanF)) + } else { + None + } + } + val local = self.mapPartitions(it => Iterator(reducePartition(it))) + val op: (Option[T], Option[T]) => Option[T] = (c, x) => { + if (c.isDefined && x.isDefined) { + Some(cleanF(c.get, x.get)) + } else if (c.isDefined) { + c + } else if (x.isDefined) { + x + } else { + None + } + } + RDDFunctions.fromRDD(local).treeAggregate(Option.empty[T])(op, op, depth) + .getOrElse(throw new UnsupportedOperationException("empty collection")) + } + + /** + * Aggregates the elements of this RDD in a tree pattern. + * @param depth suggested depth of the tree + * @see [[org.apache.spark.rdd.RDD#aggregate]] + */ + def treeAggregate[U: ClassTag](zeroValue: U)( + seqOp: (U, T) => U, + combOp: (U, U) => U, + depth: Int): U = { + require(depth >= 1, s"Depth must be greater than 1 but got $depth.") + if (self.partitions.size == 0) { + return Utils.clone(zeroValue, self.context.env.closureSerializer.newInstance()) + } + val cleanSeqOp = self.context.clean(seqOp) + val cleanCombOp = self.context.clean(combOp) + val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) + var local = self.mapPartitions(it => Iterator(aggregatePartition(it))) + var numPartitions = local.partitions.size + val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) + while (numPartitions > scale + numPartitions / scale) { + numPartitions /= scale + local = local.mapPartitionsWithIndex { (i, iter) => + iter.map((i % numPartitions, _)) + }.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values + } + local.reduce(cleanCombOp) + } } private[mllib] diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala index 3f3b10dfff35e..30f057ce04c44 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala @@ -46,4 +46,22 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext { val expected = data.flatMap(x => x).sliding(3).toList assert(sliding.collect().toList === expected) } + + test("treeAggregate") { + val rdd = sc.makeRDD(-1000 until 1000, 10) + def seqOp = (c: Long, x: Int) => c + x + def combOp = (c1: Long, c2: Long) => c1 + c2 + for (level <- 1 until 10) { + val sum = rdd.treeAggregate(0L)(seqOp, combOp, level) + assert(sum === -1000L) + } + } + + test("treeReduce") { + val rdd = sc.makeRDD(-1000 until 1000, 10) + for (level <- 1 until 10) { + val sum = rdd.treeReduce(_ + _, level) + assert(sum === -1000) + } + } } From 7495681b5c3c615f27b65dbb0cf5a3a0f0d5d0d0 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 27 Jul 2014 17:41:10 -0700 Subject: [PATCH 6/9] fix compile error --- .../main/scala/org/apache/spark/mllib/optimization/LBFGS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index f992e983701a4..392f6dfb415e9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -200,7 +200,7 @@ object LBFGS extends Logging { val n = weights.length val bcWeights = data.context.broadcast(weights) - val (gradientSum, lossSum) = localData.treeAggregate((BDV.zeros[Double](n), 0.0))( + val (gradientSum, lossSum) = data.treeAggregate((BDV.zeros[Double](n), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = localGradient.compute( features, label, Vectors.fromBreeze(bcWeights.value), Vectors.fromBreeze(grad)) From 9bcc5d30563d897f531429ef6df6df0f041b31de Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 27 Jul 2014 17:49:05 -0700 Subject: [PATCH 7/9] add depth for readability --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 4 ++-- .../org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 19bb7b9ebfd5c..ccfc2cd7714f0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -109,7 +109,7 @@ class RowMatrix( seqOp = (U, v) => { RowMatrix.dspr(1.0, v, U.data) U - }, combOp = (U1, U2) => U1 += U2, 2) + }, combOp = (U1, U2) => U1 += U2, depth = 2) RowMatrix.triuToFull(n, GU.data) } @@ -293,7 +293,7 @@ class RowMatrix( seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze), combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2), - 2) + depth = 2) updateNumRows(m) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala index 30f057ce04c44..27a19f793242b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala @@ -51,16 +51,16 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext { val rdd = sc.makeRDD(-1000 until 1000, 10) def seqOp = (c: Long, x: Int) => c + x def combOp = (c1: Long, c2: Long) => c1 + c2 - for (level <- 1 until 10) { - val sum = rdd.treeAggregate(0L)(seqOp, combOp, level) + for (depth <- 1 until 10) { + val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth) assert(sum === -1000L) } } test("treeReduce") { val rdd = sc.makeRDD(-1000 until 1000, 10) - for (level <- 1 until 10) { - val sum = rdd.treeReduce(_ + _, level) + for (depth <- 1 until 10) { + val sum = rdd.treeReduce(_ + _, depth) assert(sum === -1000) } } From b04b96a2228df72ac143d6dcf1cc51cefccebad8 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 29 Jul 2014 00:16:41 -0700 Subject: [PATCH 8/9] address comments --- .../mllib/linalg/distributed/RowMatrix.scala | 6 ++-- .../apache/spark/mllib/rdd/RDDFunctions.scala | 28 ++++++++++--------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index ccfc2cd7714f0..807d43a2713c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -80,7 +80,7 @@ class RowMatrix( private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = { val n = numCols().toInt val vbr = rows.context.broadcast(v) - rows.aggregate(BDV.zeros[Double](n))( + rows.treeAggregate(BDV.zeros[Double](n))( seqOp = (U, r) => { val rBrz = r.toBreeze val a = rBrz.dot(vbr.value) @@ -93,8 +93,8 @@ class RowMatrix( } U }, - combOp = (U1, U2) => U1 += U2 - ) + combOp = (U1, U2) => U1 += U2, + depth = 2) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index 8bc688b94824b..d5429d62bec07 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -54,7 +54,7 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { * @see [[org.apache.spark.rdd.RDD#reduce]] */ def treeReduce(f: (T, T) => T, depth: Int): T = { - require(depth >= 1, s"Depth must be greater than 1 but got $depth.") + require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") val cleanF = self.context.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { @@ -63,7 +63,7 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { None } } - val local = self.mapPartitions(it => Iterator(reducePartition(it))) + val partiallyReduced = self.mapPartitions(it => Iterator(reducePartition(it))) val op: (Option[T], Option[T]) => Option[T] = (c, x) => { if (c.isDefined && x.isDefined) { Some(cleanF(c.get, x.get)) @@ -75,7 +75,7 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { None } } - RDDFunctions.fromRDD(local).treeAggregate(Option.empty[T])(op, op, depth) + RDDFunctions.fromRDD(partiallyReduced).treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException("empty collection")) } @@ -85,26 +85,28 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { * @see [[org.apache.spark.rdd.RDD#aggregate]] */ def treeAggregate[U: ClassTag](zeroValue: U)( - seqOp: (U, T) => U, - combOp: (U, U) => U, - depth: Int): U = { - require(depth >= 1, s"Depth must be greater than 1 but got $depth.") + seqOp: (U, T) => U, + combOp: (U, U) => U, + depth: Int): U = { + require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") if (self.partitions.size == 0) { return Utils.clone(zeroValue, self.context.env.closureSerializer.newInstance()) } val cleanSeqOp = self.context.clean(seqOp) val cleanCombOp = self.context.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - var local = self.mapPartitions(it => Iterator(aggregatePartition(it))) - var numPartitions = local.partitions.size + var partiallyAggregated = self.mapPartitions(it => Iterator(aggregatePartition(it))) + var numPartitions = partiallyAggregated.partitions.size val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) + // If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation. while (numPartitions > scale + numPartitions / scale) { numPartitions /= scale - local = local.mapPartitionsWithIndex { (i, iter) => - iter.map((i % numPartitions, _)) - }.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values + val curNumPartitions = numPartitions + partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => + iter.map((i % curNumPartitions, _)) + }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } - local.reduce(cleanCombOp) + partiallyAggregated.reduce(cleanCombOp) } } From c6cd267aeb7effe245bfdc2d0d6f00ac8b5f6d02 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 29 Jul 2014 00:23:49 -0700 Subject: [PATCH 9/9] make depth default to 2 --- .../spark/mllib/linalg/distributed/RowMatrix.scala | 13 +++++-------- .../spark/mllib/optimization/GradientDescent.scala | 2 +- .../apache/spark/mllib/optimization/LBFGS.scala | 2 +- .../org/apache/spark/mllib/rdd/RDDFunctions.scala | 14 ++++++++------ 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 807d43a2713c4..58c1322757a43 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -92,9 +92,7 @@ class RowMatrix( s"Do not support vector operation from type ${rBrz.getClass.getName}.") } U - }, - combOp = (U1, U2) => U1 += U2, - depth = 2) + }, combOp = (U1, U2) => U1 += U2) } /** @@ -109,7 +107,7 @@ class RowMatrix( seqOp = (U, v) => { RowMatrix.dspr(1.0, v, U.data) U - }, combOp = (U1, U2) => U1 += U2, depth = 2) + }, combOp = (U1, U2) => U1 += U2) RowMatrix.triuToFull(n, GU.data) } @@ -292,8 +290,8 @@ class RowMatrix( val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))( seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze), combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => - (s1._1 + s2._1, s1._2 += s2._2), - depth = 2) + (s1._1 + s2._1, s1._2 += s2._2) + ) updateNumRows(m) @@ -355,8 +353,7 @@ class RowMatrix( def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = { val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)( (aggregator, data) => aggregator.add(data), - (aggregator1, aggregator2) => aggregator1.merge(aggregator2), - depth = 2) + (aggregator1, aggregator2) => aggregator1.merge(aggregator2)) updateNumRows(summary.count) summary } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 2db64ed1fd1ac..356aa949afcf5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -185,7 +185,7 @@ object GradientDescent extends Logging { }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => (grad1 += grad2, loss1 + loss2) - }, depth = 2) + }) /** * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 392f6dfb415e9..26a2b62e76ed0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -208,7 +208,7 @@ object LBFGS extends Logging { }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => (grad1 += grad2, loss1 + loss2) - }, depth = 2) + }) /** * regVal is sum of weight squares if it's L2 updater; diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index d5429d62bec07..b5e403bc8c14d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -49,11 +49,12 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { } /** - * Reduces the elements of this RDD in a tree pattern. - * @param depth suggested depth of the tree + * Reduces the elements of this RDD in a multi-level tree pattern. + * + * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#reduce]] */ - def treeReduce(f: (T, T) => T, depth: Int): T = { + def treeReduce(f: (T, T) => T, depth: Int = 2): T = { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") val cleanF = self.context.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { @@ -80,14 +81,15 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) { } /** - * Aggregates the elements of this RDD in a tree pattern. - * @param depth suggested depth of the tree + * Aggregates the elements of this RDD in a multi-level tree pattern. + * + * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#aggregate]] */ def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, - depth: Int): U = { + depth: Int = 2): U = { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") if (self.partitions.size == 0) { return Utils.clone(zeroValue, self.context.env.closureSerializer.newInstance())