From 66f33de8b4d8bb699aa088cfe37c7ef82db486ec Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 25 Feb 2020 18:39:30 +0800 Subject: [PATCH 1/8] init init init init init nit nit nit nit --- .../mllib/clustering/DistanceMeasure.scala | 153 +++++++++++++++++- .../spark/mllib/clustering/KMeans.scala | 36 +++-- .../spark/mllib/clustering/KMeansModel.scala | 14 +- 3 files changed, 179 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index e83dd3723be1..dd8f6482c070 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -24,16 +24,60 @@ import org.apache.spark.mllib.util.MLUtils private[spark] abstract class DistanceMeasure extends Serializable { + /** + * Radii of centers used in triangle inequality to obtain useful bounds to find + * closest centers. + * + * @see Charles Elkan, + * Using the Triangle Inequality to Accelerate k-Means + * + * @return Radii of centers. If distance between point x and center c is less than + * the radius of center c, then center c is the closest center to point x. + */ + def computeRadii(centers: Array[VectorWithNorm]): Array[Double] = { + val k = centers.length + Array.fill(k)(Double.NaN) + } + /** * @return the index of the closest center to the given point, as well as the cost. */ def findClosest( - centers: TraversableOnce[VectorWithNorm], + centers: Array[VectorWithNorm], + radii: Array[Double], point: VectorWithNorm): (Int, Double) = { var bestDistance = Double.PositiveInfinity var bestIndex = 0 var i = 0 - centers.foreach { center => + var found = false + while (i < centers.length && !found) { + val center = centers(i) + val d = distance(center, point) + val r = radii(i) + if (d < r) { + bestDistance = d + bestIndex = i + found = true + } else if (d < bestDistance) { + bestDistance = d + bestIndex = i + } + i += 1 + } + (bestIndex, bestDistance) + } + + /** + * @return the index of the closest center to the given point, as well as the cost. + */ + def findClosest( + centers: Array[VectorWithNorm], + point: VectorWithNorm): (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + while (i < centers.length) { + val center = centers(i) val currentDistance = distance(center, point) if (currentDistance < bestDistance) { bestDistance = currentDistance @@ -48,7 +92,7 @@ private[spark] abstract class DistanceMeasure extends Serializable { * @return the K-means cost of a given point against the given cluster centers. */ def pointCost( - centers: TraversableOnce[VectorWithNorm], + centers: Array[VectorWithNorm], point: VectorWithNorm): Double = { findClosest(centers, point)._2 } @@ -154,22 +198,86 @@ object DistanceMeasure { } private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { + + /** + * @return Radii of centers. If distance between point x and center c is less than + * the radius of center c, then center c is the closest center to point x. + * For Euclidean distance, radius of center c is half of the distance between + * center c and its closest center. + */ + override def computeRadii(centers: Array[VectorWithNorm]): Array[Double] = { + val k = centers.length + if (k == 1) { + Array(Double.NaN) + } else { + val distances = Array.fill(k)(Double.PositiveInfinity) + var i = 0 + while (i < k) { + var j = i + 1 + while (j < k) { + val d = distance(centers(i), centers(j)) + if (d < distances(i)) distances(i) = d + if (d < distances(j)) distances(j) = d + j += 1 + } + i += 1 + } + + distances.map(_ / 2) + } + } + + /** + * @return the index of the closest center to the given point, as well as the cost. + */ + override def findClosest( + centers: Array[VectorWithNorm], + radii: Array[Double], + point: VectorWithNorm): (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + var found = false + while (i < centers.length && !found) { + val center = centers(i) + // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary + // distance computation. + var lowerBoundOfSqDist = center.norm - point.norm + lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist + if (lowerBoundOfSqDist < bestDistance) { + val d = EuclideanDistanceMeasure.fastSquaredDistance(center, point) + val r = radii(i) + if (d < r * r) { + bestDistance = d + bestIndex = i + found = true + } else if (d < bestDistance) { + bestDistance = d + bestIndex = i + } + } + i += 1 + } + (bestIndex, bestDistance) + } + /** * @return the index of the closest center to the given point, as well as the squared distance. */ override def findClosest( - centers: TraversableOnce[VectorWithNorm], + centers: Array[VectorWithNorm], point: VectorWithNorm): (Int, Double) = { var bestDistance = Double.PositiveInfinity var bestIndex = 0 var i = 0 - centers.foreach { center => + while (i < centers.length) { + val center = centers(i) // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary // distance computation. var lowerBoundOfSqDist = center.norm - point.norm lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist if (lowerBoundOfSqDist < bestDistance) { - val distance: Double = EuclideanDistanceMeasure.fastSquaredDistance(center, point) + val distance = EuclideanDistanceMeasure.fastSquaredDistance(center, point) if (distance < bestDistance) { bestDistance = distance bestIndex = i @@ -234,6 +342,39 @@ private[spark] object EuclideanDistanceMeasure { } private[spark] class CosineDistanceMeasure extends DistanceMeasure { + + /** + * @return Radii of centers. If distance between point x and center c is less than + * the radius of center c, then center c is the closest center to point x. + * For Cosine distance, it is similar to Euclidean distance. However, here + * radian/angle is used instead of Cosine distance: for center c, finding + * its closest center, computing the radian/angle between them, halving the + * radian/angle, and converting it back to Cosine distance at the end. + */ + override def computeRadii(centers: Array[VectorWithNorm]): Array[Double] = { + val k = centers.length + if (k == 1) { + Array(Double.NaN) + } else { + val distances = Array.fill(k)(Double.PositiveInfinity) + var i = 0 + while (i < k) { + var j = i + 1 + while (j < k) { + val d = distance(centers(i), centers(j)) + if (d < distances(i)) distances(i) = d + if (d < distances(j)) distances(j) = d + j += 1 + } + i += 1 + } + + // d = 1 - cos(x) + // r = 1 - cos(x/2) = 1 - sqrt((cos(x) + 1) / 2) = 1 - sqrt(1 - d/2) + distances.map(d => 1 - math.sqrt(1 - d / 2)) + } + } + /** * @param v1: first vector * @param v2: second vector diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index a3cf7f96478a..9091a319a44f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -266,6 +266,7 @@ class KMeans private ( var converged = false var cost = 0.0 var iteration = 0 + var totalRadiiTime = 0L val iterationStartTime = System.nanoTime() @@ -273,23 +274,27 @@ class KMeans private ( // Execute iterations of Lloyd's algorithm until converged while (iteration < maxIterations && !converged) { + val radiiStartTime = System.nanoTime() + val radii = distanceMeasureInstance.computeRadii(centers) + totalRadiiTime += System.nanoTime() - radiiStartTime + val costAccum = sc.doubleAccumulator - val bcCenters = sc.broadcast(centers) + val bcCenters = sc.broadcast((centers, radii)) // Find the new centers val collected = data.mapPartitions { points => - val thisCenters = bcCenters.value - val dims = thisCenters.head.vector.size + val (centers, radii) = bcCenters.value + val dims = centers.head.vector.size - val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) + val sums = Array.fill(centers.length)(Vectors.zeros(dims)) // clusterWeightSum is needed to calculate cluster center // cluster center = // sample1 * weight1/clusterWeightSum + sample2 * weight2/clusterWeightSum + ... - val clusterWeightSum = Array.ofDim[Double](thisCenters.length) + val clusterWeightSum = Array.ofDim[Double](centers.length) points.foreach { point => - val (bestCenter, cost) = distanceMeasureInstance.findClosest(thisCenters, point) + val (bestCenter, cost) = distanceMeasureInstance.findClosest(centers, radii, point) costAccum.add(cost * point.weight) distanceMeasureInstance.updateClusterSum(point, sums(bestCenter)) clusterWeightSum(bestCenter) += point.weight @@ -307,15 +312,12 @@ class KMeans private ( instr.foreach(_.logSumOfWeights(collected.values.map(_._2).sum)) } - val newCenters = collected.mapValues { case (sum, weightSum) => - distanceMeasureInstance.centroid(sum, weightSum) - } - bcCenters.destroy() // Update the cluster centers and costs converged = true - newCenters.foreach { case (j, newCenter) => + collected.foreach { case (j, (sum, weightSum)) => + val newCenter = distanceMeasureInstance.centroid(sum, weightSum) if (converged && !distanceMeasureInstance.isCenterConverged(centers(j), newCenter, epsilon)) { converged = false @@ -324,8 +326,10 @@ class KMeans private ( } cost = costAccum.value + instr.foreach(_.logNamedValue(s"Cost@iter=$iteration", s"$cost")) iteration += 1 } + logInfo(f"Radii computation took ${totalRadiiTime / 1e9}%.3f seconds.") val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") @@ -372,7 +376,7 @@ class KMeans private ( require(sample.nonEmpty, s"No samples available from $data") val centers = ArrayBuffer[VectorWithNorm]() - var newCenters = Seq(sample.head.toDense) + var newCenters = Array(sample.head.toDense) centers ++= newCenters // On each step, sample 2 * k points on average with probability proportional @@ -404,10 +408,10 @@ class KMeans private ( costs.unpersist() bcNewCentersList.foreach(_.destroy()) - val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_)) + val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_)).toArray - if (distinctCenters.size <= k) { - distinctCenters.toArray + if (distinctCenters.length <= k) { + distinctCenters } else { // Finally, we might have a set of more than k distinct candidate centers; weight each // candidate by the number of points in the dataset mapping to it and run a local k-means++ @@ -420,7 +424,7 @@ class KMeans private ( bcCenters.destroy() val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray - LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30) + LocalKMeans.kMeansPlusPlus(0, distinctCenters, myWeights, k, 30) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 0c6570ff81ef..9ee4f335b432 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -48,6 +48,13 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], @transient private lazy val clusterCentersWithNorm = if (clusterCenters == null) null else clusterCenters.map(new VectorWithNorm(_)) + // TODO: computation of radii may take seconds, so save it to KMeansModel in training + @transient private lazy val radii = if (clusterCenters == null) { + null + } else { + distanceMeasureInstance.computeRadii(clusterCentersWithNorm) + } + @Since("2.4.0") private[spark] def this(clusterCenters: Array[Vector], distanceMeasure: String) = this(clusterCenters: Array[Vector], distanceMeasure, 0.0, -1) @@ -73,7 +80,8 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], */ @Since("0.8.0") def predict(point: Vector): Int = { - distanceMeasureInstance.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1 + distanceMeasureInstance.findClosest(clusterCentersWithNorm, radii, + new VectorWithNorm(point))._1 } /** @@ -82,8 +90,10 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], @Since("1.0.0") def predict(points: RDD[Vector]): RDD[Int] = { val bcCentersWithNorm = points.context.broadcast(clusterCentersWithNorm) + val bcRadii = points.context.broadcast(radii) points.map(p => - distanceMeasureInstance.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1) + distanceMeasureInstance.findClosest(bcCentersWithNorm.value, + bcRadii.value, new VectorWithNorm(p))._1) } /** From c6b69ee0c96437e3f287a35cb28e44a6d223a8ce Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 3 Mar 2020 11:45:40 +0800 Subject: [PATCH 2/8] update update --- .../mllib/clustering/DistanceMeasure.scala | 171 ++++++++++-------- .../spark/mllib/clustering/KMeans.scala | 8 +- .../spark/mllib/clustering/KMeansModel.scala | 10 +- 3 files changed, 106 insertions(+), 83 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index dd8f6482c070..c2f4eb59e711 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -25,47 +25,17 @@ import org.apache.spark.mllib.util.MLUtils private[spark] abstract class DistanceMeasure extends Serializable { /** - * Radii of centers used in triangle inequality to obtain useful bounds to find - * closest centers. - * - * @see Charles Elkan, - * Using the Triangle Inequality to Accelerate k-Means - * - * @return Radii of centers. If distance between point x and center c is less than - * the radius of center c, then center c is the closest center to point x. + * Statistics used in triangle inequality to obtain useful bounds to find closest centers. */ - def computeRadii(centers: Array[VectorWithNorm]): Array[Double] = { - val k = centers.length - Array.fill(k)(Double.NaN) - } + def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] /** * @return the index of the closest center to the given point, as well as the cost. */ def findClosest( centers: Array[VectorWithNorm], - radii: Array[Double], - point: VectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - var found = false - while (i < centers.length && !found) { - val center = centers(i) - val d = distance(center, point) - val r = radii(i) - if (d < r) { - bestDistance = d - bestIndex = i - found = true - } else if (d < bestDistance) { - bestDistance = d - bestIndex = i - } - i += 1 - } - (bestIndex, bestDistance) - } + statistics: Array[Array[Double]], + point: VectorWithNorm): (Int, Double) /** * @return the index of the closest center to the given point, as well as the cost. @@ -200,30 +170,45 @@ object DistanceMeasure { private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { /** - * @return Radii of centers. If distance between point x and center c is less than - * the radius of center c, then center c is the closest center to point x. - * For Euclidean distance, radius of center c is half of the distance between - * center c and its closest center. + * Statistics used in triangle inequality to obtain useful bounds to find closest centers. + * @see Charles Elkan, + * Using the Triangle Inequality to Accelerate k-Means + * + * @return A symmetric matrix containing statistics, matrix(i)(j) represents: + * 1, squared radii of the center i, if i==j. If distance between point x and center i + * is less than the radius of center i, then center i is the closest center to point x. + * For Euclidean distance, radius of center i is half of the distance between center i + * and its closest center; + * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. + * Given point x, let i be current closest center, and d be current best squared + * distance, if d < r, then we no longer need to compute the distance to center j. */ - override def computeRadii(centers: Array[VectorWithNorm]): Array[Double] = { + override def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] = { val k = centers.length if (k == 1) { - Array(Double.NaN) + Array(Array(Double.NaN)) } else { - val distances = Array.fill(k)(Double.PositiveInfinity) + val matrix = Array.ofDim[Double](k, k) var i = 0 + while (i < k) { + matrix(i)(i) = Double.PositiveInfinity + i += 1 + } + i = 0 while (i < k) { var j = i + 1 while (j < k) { val d = distance(centers(i), centers(j)) - if (d < distances(i)) distances(i) = d - if (d < distances(j)) distances(j) = d + val r = 0.25 * d * d + matrix(i)(j) = r + matrix(j)(i) = r + if (r < matrix(i)(i)) matrix(i)(i) = r + if (r < matrix(j)(j)) matrix(j)(j) = r j += 1 } i += 1 } - - distances.map(_ / 2) + matrix } } @@ -232,25 +217,25 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { */ override def findClosest( centers: Array[VectorWithNorm], - radii: Array[Double], + statistics: Array[Array[Double]], point: VectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity + var bestDistance = EuclideanDistanceMeasure.fastSquaredDistance(centers(0), point) + if (bestDistance < statistics(0)(0)) { + return (0, bestDistance) + } + var bestIndex = 0 - var i = 0 - var found = false - while (i < centers.length && !found) { + var i = 1 + while (i < centers.length) { val center = centers(i) // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary // distance computation. - var lowerBoundOfSqDist = center.norm - point.norm - lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist - if (lowerBoundOfSqDist < bestDistance) { + val normDiff = center.norm - point.norm + val lowerBound = normDiff * normDiff + if (lowerBound < bestDistance && statistics(i)(bestIndex) < bestDistance) { val d = EuclideanDistanceMeasure.fastSquaredDistance(center, point) - val r = radii(i) - if (d < r * r) { - bestDistance = d - bestIndex = i - found = true + if (d < statistics(i)(i)) { + return (i, d) } else if (d < bestDistance) { bestDistance = d bestIndex = i @@ -344,35 +329,77 @@ private[spark] object EuclideanDistanceMeasure { private[spark] class CosineDistanceMeasure extends DistanceMeasure { /** - * @return Radii of centers. If distance between point x and center c is less than - * the radius of center c, then center c is the closest center to point x. - * For Cosine distance, it is similar to Euclidean distance. However, here - * radian/angle is used instead of Cosine distance: for center c, finding - * its closest center, computing the radian/angle between them, halving the - * radian/angle, and converting it back to Cosine distance at the end. + * Statistics used in triangle inequality to obtain useful bounds to find closest centers. + * @return A symmetric matrix containing statistics, matrix(i)(j) represents: + * 1, squared radii of the center i, if i==j. If distance between point x and center i + * is less than the radius of center i, then center i is the closest center to point x. + * For Cosine distance, it is similar to Euclidean distance. However, here radian/angle + * is used instead of Cosine distance: for center c, finding its closest center, + * computing the radian/angle between them, halving it, and converting it back to Cosine + * distance at the end. + * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. + * Given point x, let i be current closest center, and d be current best squared + * distance, if d < r, then we no longer need to compute the distance to center j. */ - override def computeRadii(centers: Array[VectorWithNorm]): Array[Double] = { + override def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] = { val k = centers.length if (k == 1) { - Array(Double.NaN) + Array(Array(Double.NaN)) } else { - val distances = Array.fill(k)(Double.PositiveInfinity) + val matrix = Array.ofDim[Double](k, k) var i = 0 + while (i < k) { + matrix(i)(i) = Double.PositiveInfinity + i += 1 + } + i = 0 while (i < k) { var j = i + 1 while (j < k) { + // d = 1 - cos(x) + // r = 1 - cos(x/2) = 1 - sqrt((cos(x) + 1) / 2) = 1 - sqrt(1 - d/2) val d = distance(centers(i), centers(j)) - if (d < distances(i)) distances(i) = d - if (d < distances(j)) distances(j) = d + val r = 1 - math.sqrt(1 - d / 2) + matrix(i)(j) = r + matrix(j)(i) = r + if (r < matrix(i)(i)) matrix(i)(i) = r + if (r < matrix(j)(j)) matrix(j)(j) = r j += 1 } i += 1 } + matrix + } + } + + /** + * @return the index of the closest center to the given point, as well as the cost. + */ + def findClosest( + centers: Array[VectorWithNorm], + statistics: Array[Array[Double]], + point: VectorWithNorm): (Int, Double) = { + var bestDistance = distance(centers(0), point) + if (bestDistance < statistics(0)(0)) { + return (0, bestDistance) + } - // d = 1 - cos(x) - // r = 1 - cos(x/2) = 1 - sqrt((cos(x) + 1) / 2) = 1 - sqrt(1 - d/2) - distances.map(d => 1 - math.sqrt(1 - d / 2)) + var bestIndex = 0 + var i = 1 + while (i < centers.length) { + if (statistics(i)(bestIndex) < bestDistance) { + val center = centers(i) + val d = distance(center, point) + if (d < statistics(i)(i)) { + return (i, d) + } else if (d < bestDistance) { + bestDistance = d + bestIndex = i + } + } + i += 1 } + (bestIndex, bestDistance) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 9091a319a44f..bf3c4e57cc85 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -266,7 +266,6 @@ class KMeans private ( var converged = false var cost = 0.0 var iteration = 0 - var totalRadiiTime = 0L val iterationStartTime = System.nanoTime() @@ -274,12 +273,10 @@ class KMeans private ( // Execute iterations of Lloyd's algorithm until converged while (iteration < maxIterations && !converged) { - val radiiStartTime = System.nanoTime() - val radii = distanceMeasureInstance.computeRadii(centers) - totalRadiiTime += System.nanoTime() - radiiStartTime + val statistics = distanceMeasureInstance.computeStatistics(centers) val costAccum = sc.doubleAccumulator - val bcCenters = sc.broadcast((centers, radii)) + val bcCenters = sc.broadcast((centers, statistics)) // Find the new centers val collected = data.mapPartitions { points => @@ -329,7 +326,6 @@ class KMeans private ( instr.foreach(_.logNamedValue(s"Cost@iter=$iteration", s"$cost")) iteration += 1 } - logInfo(f"Radii computation took ${totalRadiiTime / 1e9}%.3f seconds.") val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 9ee4f335b432..ef3101b0812d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -48,11 +48,11 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], @transient private lazy val clusterCentersWithNorm = if (clusterCenters == null) null else clusterCenters.map(new VectorWithNorm(_)) - // TODO: computation of radii may take seconds, so save it to KMeansModel in training - @transient private lazy val radii = if (clusterCenters == null) { + // TODO: computation of statistics may take seconds, so save it to KMeansModel in training + @transient private lazy val statistics = if (clusterCenters == null) { null } else { - distanceMeasureInstance.computeRadii(clusterCentersWithNorm) + distanceMeasureInstance.computeStatistics(clusterCentersWithNorm) } @Since("2.4.0") @@ -80,7 +80,7 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], */ @Since("0.8.0") def predict(point: Vector): Int = { - distanceMeasureInstance.findClosest(clusterCentersWithNorm, radii, + distanceMeasureInstance.findClosest(clusterCentersWithNorm, statistics, new VectorWithNorm(point))._1 } @@ -90,7 +90,7 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], @Since("1.0.0") def predict(points: RDD[Vector]): RDD[Int] = { val bcCentersWithNorm = points.context.broadcast(clusterCentersWithNorm) - val bcRadii = points.context.broadcast(radii) + val bcRadii = points.context.broadcast(statistics) points.map(p => distanceMeasureInstance.findClosest(bcCentersWithNorm.value, bcRadii.value, new VectorWithNorm(p))._1) From 488713edcfe449a0b755293f5eab40e9a167bf1e Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 20 Mar 2020 22:41:26 +0800 Subject: [PATCH 3/8] compute stats distributedly compute stats distributedly nit nit --- .../mllib/clustering/DistanceMeasure.scala | 156 +++++++++++------- .../spark/mllib/clustering/KMeans.scala | 28 ++-- 2 files changed, 115 insertions(+), 69 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index c2f4eb59e711..4ca91259772b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -17,7 +17,9 @@ package org.apache.spark.mllib.clustering +import org.apache.spark.SparkContext import org.apache.spark.annotation.Since +import org.apache.spark.broadcast.Broadcast import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal} import org.apache.spark.mllib.util.MLUtils @@ -26,8 +28,93 @@ private[spark] abstract class DistanceMeasure extends Serializable { /** * Statistics used in triangle inequality to obtain useful bounds to find closest centers. + * @param distance distance between two centers */ - def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] + def computeStatistics(distance: Double): Double + + /** + * Statistics used in triangle inequality to obtain useful bounds to find closest centers. + * + * @return A symmetric matrix containing statistics, matrix(i)(j) represents: + * 1, a lower bound r of the center i, if i==j. If distance between point x and center i + * is less than f(r), then center i is the closest center to point x. + * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. + * Given point x, let i be current closest center, and d be current best distance, + * if d < f(r), then we no longer need to compute the distance to center j. + */ + def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] = { + val k = centers.length + if (k == 1) return Array(Array(Double.NaN)) + + val stats = Array.ofDim[Double](k, k) + var i = 0 + while (i < k) { + stats(i)(i) = Double.PositiveInfinity + i += 1 + } + i = 0 + while (i < k) { + var j = i + 1 + while (j < k) { + val d = distance(centers(i), centers(j)) + val s = computeStatistics(d) + stats(i)(j) = s + stats(j)(i) = s + if (s < stats(i)(i)) stats(i)(i) = s + if (s < stats(j)(j)) stats(j)(j) = s + j += 1 + } + i += 1 + } + stats + } + + /** + * Compute distance between centers in a distributed way. + */ + def computeStatisticsDistributedly( + sc: SparkContext, + bcCenters: Broadcast[Array[VectorWithNorm]]): Array[Array[Double]] = { + val k = bcCenters.value.length + if (k == 1) return Array(Array(Double.NaN)) + + val numParts = math.min(k, 1024) + val collected = sc.range(0, numParts, 1, numParts) + .mapPartitionsWithIndex { case (pid, _) => + val centers = bcCenters.value + Iterator.range(0, k).flatMap { i => + Iterator.range(i + 1, k).flatMap { j => + val hash = (i, j).hashCode.abs + if (hash % numParts == pid) { + val d = distance(centers(i), centers(j)) + val s = computeStatistics(d) + Iterator.single(((i, j), s)) + } else Iterator.empty + } + }.filterNot(_._2 == 0) + }.collectAsMap() + + val stats = Array.ofDim[Double](k, k) + var i = 0 + while (i < k) { + stats(i)(i) = Double.PositiveInfinity + i += 1 + } + i = 0 + while (i < k) { + var j = i + 1 + while (j < k) { + val s = collected.getOrElse((i, j), 0.0) + stats(i)(j) = s + stats(j)(i) = s + if (s < stats(i)(i)) stats(i)(i) = s + if (s < stats(j)(j)) stats(j)(j) = s + j += 1 + } + i += 1 + } + stats + } /** * @return the index of the closest center to the given point, as well as the cost. @@ -174,7 +261,7 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { * @see Charles Elkan, * Using the Triangle Inequality to Accelerate k-Means * - * @return A symmetric matrix containing statistics, matrix(i)(j) represents: + * @return One element used in statistics matrix to make matrix(i)(j) represents: * 1, squared radii of the center i, if i==j. If distance between point x and center i * is less than the radius of center i, then center i is the closest center to point x. * For Euclidean distance, radius of center i is half of the distance between center i @@ -183,33 +270,8 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { * Given point x, let i be current closest center, and d be current best squared * distance, if d < r, then we no longer need to compute the distance to center j. */ - override def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] = { - val k = centers.length - if (k == 1) { - Array(Array(Double.NaN)) - } else { - val matrix = Array.ofDim[Double](k, k) - var i = 0 - while (i < k) { - matrix(i)(i) = Double.PositiveInfinity - i += 1 - } - i = 0 - while (i < k) { - var j = i + 1 - while (j < k) { - val d = distance(centers(i), centers(j)) - val r = 0.25 * d * d - matrix(i)(j) = r - matrix(j)(i) = r - if (r < matrix(i)(i)) matrix(i)(i) = r - if (r < matrix(j)(j)) matrix(j)(j) = r - j += 1 - } - i += 1 - } - matrix - } + override def computeStatistics(distance: Double): Double = { + 0.25 * distance * distance } /** @@ -330,7 +392,8 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { /** * Statistics used in triangle inequality to obtain useful bounds to find closest centers. - * @return A symmetric matrix containing statistics, matrix(i)(j) represents: + * + * @return One element used in statistics matrix to make matrix(i)(j) represents: * 1, squared radii of the center i, if i==j. If distance between point x and center i * is less than the radius of center i, then center i is the closest center to point x. * For Cosine distance, it is similar to Euclidean distance. However, here radian/angle @@ -341,35 +404,10 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { * Given point x, let i be current closest center, and d be current best squared * distance, if d < r, then we no longer need to compute the distance to center j. */ - override def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] = { - val k = centers.length - if (k == 1) { - Array(Array(Double.NaN)) - } else { - val matrix = Array.ofDim[Double](k, k) - var i = 0 - while (i < k) { - matrix(i)(i) = Double.PositiveInfinity - i += 1 - } - i = 0 - while (i < k) { - var j = i + 1 - while (j < k) { - // d = 1 - cos(x) - // r = 1 - cos(x/2) = 1 - sqrt((cos(x) + 1) / 2) = 1 - sqrt(1 - d/2) - val d = distance(centers(i), centers(j)) - val r = 1 - math.sqrt(1 - d / 2) - matrix(i)(j) = r - matrix(j)(i) = r - if (r < matrix(i)(i)) matrix(i)(i) = r - if (r < matrix(j)(j)) matrix(j)(j) = r - j += 1 - } - i += 1 - } - matrix - } + override def computeStatistics(distance: Double): Double = { + // d = 1 - cos(x) + // r = 1 - cos(x/2) = 1 - sqrt((cos(x) + 1) / 2) = 1 - sqrt(1 - d/2) + 1 - math.sqrt(1 - distance / 2) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index bf3c4e57cc85..1c5de5a092d6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -209,9 +209,7 @@ class KMeans private ( */ @Since("0.8.0") def run(data: RDD[Vector]): KMeansModel = { - val instances: RDD[(Vector, Double)] = data.map { - case (point) => (point, 1.0) - } + val instances = data.map(point => (point, 1.0)) runWithWeight(instances, None) } @@ -260,6 +258,7 @@ class KMeans private ( initKMeansParallel(data, distanceMeasureInstance) } } + val numFeatures = centers.head.vector.size val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") @@ -269,18 +268,26 @@ class KMeans private ( val iterationStartTime = System.nanoTime() - instr.foreach(_.logNumFeatures(centers.head.vector.size)) + instr.foreach(_.logNumFeatures(numFeatures)) + + val shouldDistributed = centers.length * centers.length * numFeatures.toLong > 1000000L // Execute iterations of Lloyd's algorithm until converged while (iteration < maxIterations && !converged) { - val statistics = distanceMeasureInstance.computeStatistics(centers) + val bcCenters = sc.broadcast(centers) + val stats = if (shouldDistributed) { + distanceMeasureInstance.computeStatisticsDistributedly(sc, bcCenters) + } else { + distanceMeasureInstance.computeStatistics(centers) + } + val bcStats = sc.broadcast(stats) val costAccum = sc.doubleAccumulator - val bcCenters = sc.broadcast((centers, statistics)) // Find the new centers val collected = data.mapPartitions { points => - val (centers, radii) = bcCenters.value + val centers = bcCenters.value + val stats = bcStats.value val dims = centers.head.vector.size val sums = Array.fill(centers.length)(Vectors.zeros(dims)) @@ -291,14 +298,14 @@ class KMeans private ( val clusterWeightSum = Array.ofDim[Double](centers.length) points.foreach { point => - val (bestCenter, cost) = distanceMeasureInstance.findClosest(centers, radii, point) + val (bestCenter, cost) = distanceMeasureInstance.findClosest(centers, stats, point) costAccum.add(cost * point.weight) distanceMeasureInstance.updateClusterSum(point, sums(bestCenter)) clusterWeightSum(bestCenter) += point.weight } - clusterWeightSum.indices.filter(clusterWeightSum(_) > 0) - .map(j => (j, (sums(j), clusterWeightSum(j)))).iterator + Iterator.tabulate(centers.length)(j => (j, (sums(j), clusterWeightSum(j)))) + .filter(_._2._2 > 0) }.reduceByKey { (sumweight1, sumweight2) => axpy(1.0, sumweight2._1, sumweight1._1) (sumweight1._1, sumweight1._2 + sumweight2._2) @@ -310,6 +317,7 @@ class KMeans private ( } bcCenters.destroy() + bcStats.destroy() // Update the cluster centers and costs converged = true From c9a720f7f57770a9fb91108a757a4b342fb0388b Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 17 Apr 2020 15:22:04 +0800 Subject: [PATCH 4/8] package upper tri package upper tri package upper tri package upper tri --- .../org/apache/spark/ml/impl/Utils.scala | 53 ++++++++- .../spark/ml/clustering/GaussianMixture.scala | 16 +-- .../mllib/clustering/DistanceMeasure.scala | 110 ++++++++++-------- 3 files changed, 113 insertions(+), 66 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala b/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala index 112de982e463..ee3e99c0a8a5 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.impl -private[ml] object Utils { +private[spark] object Utils { lazy val EPSILON = { var eps = 1.0 @@ -27,4 +27,55 @@ private[ml] object Utils { } eps } + + /** + * Convert an n * (n + 1) / 2 dimension array representing the upper triangular part of a matrix + * into an n * n array representing the full symmetric matrix (column major). + * + * @param n The order of the n by n matrix. + * @param triangularValues The upper triangular part of the matrix packed in an array + * (column major). + * @return A dense matrix which represents the symmetric matrix in column major. + */ + def unpackUpperTriangular( + n: Int, + triangularValues: Array[Double]): Array[Double] = { + val symmetricValues = new Array[Double](n * n) + var r = 0 + var i = 0 + while (i < n) { + var j = 0 + while (j <= i) { + symmetricValues(i * n + j) = triangularValues(r) + symmetricValues(j * n + i) = triangularValues(r) + r += 1 + j += 1 + } + i += 1 + } + symmetricValues + } + + /** + * Indexing in an array representing the upper triangular part of a matrix + * into an n * n array representing the full symmetric matrix (column major). + * val symmetricValues = unpackUpperTriangularMatrix(n, triangularValues) + * val matrix = new DenseMatrix(n, n, symmetricValues) + * val index = indexUpperTriangularMatrix(n, i, j) + * then: symmetricValues(index) == matrix(i, j) + * + * @param n The order of the n by n matrix. + */ + def indexUpperTriangular( + n: Int, + i: Int, + j: Int): Int = { + require(i >= 0 && i < n, s"Expected 0 <= i < $n, got i = $i.") + require(j >= 0 && j < n, s"Expected 0 <= j < $n, got j = $j.") + if (i <= j) { + j * (j + 1) / 2 + i + } else { + i * (i + 1) / 2 + j + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index f490faf084d2..1c4560aa5fdd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.impl.Utils.EPSILON +import org.apache.spark.ml.impl.Utils.{unpackUpperTriangular, EPSILON} import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -583,19 +583,7 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] { private[clustering] def unpackUpperTriangularMatrix( n: Int, triangularValues: Array[Double]): DenseMatrix = { - val symmetricValues = new Array[Double](n * n) - var r = 0 - var i = 0 - while (i < n) { - var j = 0 - while (j <= i) { - symmetricValues(i * n + j) = triangularValues(r) - symmetricValues(j * n + i) = triangularValues(r) - r += 1 - j += 1 - } - i += 1 - } + val symmetricValues = unpackUpperTriangular(n, triangularValues) new DenseMatrix(n, n, symmetricValues) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index 4ca91259772b..7729e2a0f821 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.impl.Utils.indexUpperTriangular import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal} import org.apache.spark.mllib.util.MLUtils @@ -35,38 +36,42 @@ private[spark] abstract class DistanceMeasure extends Serializable { /** * Statistics used in triangle inequality to obtain useful bounds to find closest centers. * - * @return A symmetric matrix containing statistics, matrix(i)(j) represents: + * @return The upper triangular part of a symmetric matrix containing statistics, matrix(i)(j) + * represents: * 1, a lower bound r of the center i, if i==j. If distance between point x and center i * is less than f(r), then center i is the closest center to point x. * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. * Given point x, let i be current closest center, and d be current best distance, * if d < f(r), then we no longer need to compute the distance to center j. */ - def computeStatistics(centers: Array[VectorWithNorm]): Array[Array[Double]] = { + def computeStatistics(centers: Array[VectorWithNorm]): Array[Double] = { val k = centers.length - if (k == 1) return Array(Array(Double.NaN)) + if (k == 1) return Array(Double.NaN) - val stats = Array.ofDim[Double](k, k) + val packedValues = Array.ofDim[Double](k * (k + 1) / 2) + val diagValues = Array.fill(k)(Double.PositiveInfinity) var i = 0 - while (i < k) { - stats(i)(i) = Double.PositiveInfinity - i += 1 - } - i = 0 while (i < k) { var j = i + 1 while (j < k) { val d = distance(centers(i), centers(j)) val s = computeStatistics(d) - stats(i)(j) = s - stats(j)(i) = s - if (s < stats(i)(i)) stats(i)(i) = s - if (s < stats(j)(j)) stats(j)(j) = s + val index = indexUpperTriangular(k, i, j) + packedValues(index) = s + if (s < diagValues(i)) diagValues(i) = s + if (s < diagValues(j)) diagValues(j) = s j += 1 } i += 1 } - stats + + i = 0 + while (i < k) { + val index = indexUpperTriangular(k, i, i) + packedValues(index) = diagValues(i) + i += 1 + } + packedValues } /** @@ -74,12 +79,15 @@ private[spark] abstract class DistanceMeasure extends Serializable { */ def computeStatisticsDistributedly( sc: SparkContext, - bcCenters: Broadcast[Array[VectorWithNorm]]): Array[Array[Double]] = { + bcCenters: Broadcast[Array[VectorWithNorm]]): Array[Double] = { val k = bcCenters.value.length - if (k == 1) return Array(Array(Double.NaN)) + if (k == 1) return Array(Double.NaN) + + val packedValues = Array.ofDim[Double](k * (k + 1) / 2) + val diagValues = Array.fill(k)(Double.PositiveInfinity) val numParts = math.min(k, 1024) - val collected = sc.range(0, numParts, 1, numParts) + sc.range(0, numParts, 1, numParts) .mapPartitionsWithIndex { case (pid, _) => val centers = bcCenters.value Iterator.range(0, k).flatMap { i => @@ -88,32 +96,24 @@ private[spark] abstract class DistanceMeasure extends Serializable { if (hash % numParts == pid) { val d = distance(centers(i), centers(j)) val s = computeStatistics(d) - Iterator.single(((i, j), s)) + Iterator.single((i, j, s)) } else Iterator.empty } }.filterNot(_._2 == 0) - }.collectAsMap() + }.foreach { case (i, j, s) => + val index = indexUpperTriangular(k, i, j) + packedValues(index) = s + if (s < diagValues(i)) diagValues(i) = s + if (s < diagValues(j)) diagValues(j) = s + } - val stats = Array.ofDim[Double](k, k) var i = 0 while (i < k) { - stats(i)(i) = Double.PositiveInfinity - i += 1 - } - i = 0 - while (i < k) { - var j = i + 1 - while (j < k) { - val s = collected.getOrElse((i, j), 0.0) - stats(i)(j) = s - stats(j)(i) = s - if (s < stats(i)(i)) stats(i)(i) = s - if (s < stats(j)(j)) stats(j)(j) = s - j += 1 - } + val index = indexUpperTriangular(k, i, i) + packedValues(index) = diagValues(i) i += 1 } - stats + packedValues } /** @@ -121,7 +121,7 @@ private[spark] abstract class DistanceMeasure extends Serializable { */ def findClosest( centers: Array[VectorWithNorm], - statistics: Array[Array[Double]], + statistics: Array[Double], point: VectorWithNorm): (Int, Double) /** @@ -279,28 +279,33 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { */ override def findClosest( centers: Array[VectorWithNorm], - statistics: Array[Array[Double]], + statistics: Array[Double], point: VectorWithNorm): (Int, Double) = { var bestDistance = EuclideanDistanceMeasure.fastSquaredDistance(centers(0), point) - if (bestDistance < statistics(0)(0)) { + if (bestDistance < statistics(0)) { return (0, bestDistance) } + val k = centers.length var bestIndex = 0 var i = 1 - while (i < centers.length) { + while (i < k) { val center = centers(i) // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary // distance computation. val normDiff = center.norm - point.norm val lowerBound = normDiff * normDiff - if (lowerBound < bestDistance && statistics(i)(bestIndex) < bestDistance) { - val d = EuclideanDistanceMeasure.fastSquaredDistance(center, point) - if (d < statistics(i)(i)) { - return (i, d) - } else if (d < bestDistance) { - bestDistance = d - bestIndex = i + if (lowerBound < bestDistance) { + val index1 = indexUpperTriangular(k, i, bestIndex) + if (statistics(index1) < bestDistance) { + val d = EuclideanDistanceMeasure.fastSquaredDistance(center, point) + val index2 = indexUpperTriangular(k, i, i) + if (d < statistics(index2)) { + return (i, d) + } else if (d < bestDistance) { + bestDistance = d + bestIndex = i + } } } i += 1 @@ -415,20 +420,23 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { */ def findClosest( centers: Array[VectorWithNorm], - statistics: Array[Array[Double]], + statistics: Array[Double], point: VectorWithNorm): (Int, Double) = { var bestDistance = distance(centers(0), point) - if (bestDistance < statistics(0)(0)) { + if (bestDistance < statistics(0)) { return (0, bestDistance) } + val k = centers.length var bestIndex = 0 var i = 1 - while (i < centers.length) { - if (statistics(i)(bestIndex) < bestDistance) { + while (i < k) { + val index1 = indexUpperTriangular(k, i, bestIndex) + if (statistics(index1) < bestDistance) { val center = centers(i) val d = distance(center, point) - if (d < statistics(i)(i)) { + val index2 = indexUpperTriangular(k, i, i) + if (d < statistics(index2)) { return (i, d) } else if (d < bestDistance) { bestDistance = d From c48183ff61f54f5cd2cdcd6f95e6929b3130e2da Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 17 Apr 2020 16:12:22 +0800 Subject: [PATCH 5/8] need collect need collect need collect --- .../org/apache/spark/mllib/clustering/DistanceMeasure.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index 7729e2a0f821..ab6dc0ad4529 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -100,7 +100,7 @@ private[spark] abstract class DistanceMeasure extends Serializable { } else Iterator.empty } }.filterNot(_._2 == 0) - }.foreach { case (i, j, s) => + }.collect.foreach { case (i, j, s) => val index = indexUpperTriangular(k, i, j) packedValues(index) = s if (s < diagValues(i)) diagValues(i) = s From b05097301a574cf2d181c46e02c5e691c30782f2 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 20 Apr 2020 17:16:14 +0800 Subject: [PATCH 6/8] add testsuite for statistics --- .../mllib/clustering/DistanceMeasure.scala | 2 +- .../clustering/DistanceMeasureSuite.scala | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/clustering/DistanceMeasureSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index ab6dc0ad4529..11b2e4380f97 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -99,7 +99,7 @@ private[spark] abstract class DistanceMeasure extends Serializable { Iterator.single((i, j, s)) } else Iterator.empty } - }.filterNot(_._2 == 0) + } }.collect.foreach { case (i, j, s) => val index = indexUpperTriangular(k, i, j) packedValues(index) = s diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/DistanceMeasureSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/DistanceMeasureSuite.scala new file mode 100644 index 000000000000..73691c4ecb50 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/DistanceMeasureSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.util.Random + +import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ + +class DistanceMeasureSuite extends SparkFunSuite with MLlibTestSparkContext { + + private val seed = 42 + private val k = 10 + private val dim = 8 + + private var centers: Array[VectorWithNorm] = _ + + private var data: Array[VectorWithNorm] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + val rng = new Random(seed) + + centers = Array.tabulate(k) { i => + val values = Array.fill(dim)(rng.nextGaussian) + new VectorWithNorm(Vectors.dense(values)) + } + + data = Array.tabulate(1000) { i => + val values = Array.fill(dim)(rng.nextGaussian) + new VectorWithNorm(Vectors.dense(values)) + } + } + + test("predict with statistics") { + Seq(DistanceMeasure.COSINE, DistanceMeasure.EUCLIDEAN).foreach { distanceMeasure => + val distance = DistanceMeasure.decodeFromString(distanceMeasure) + val statistics = distance.computeStatistics(centers) + data.foreach { point => + val (index1, cost1) = distance.findClosest(centers, point) + val (index2, cost2) = distance.findClosest(centers, statistics, point) + assert(index1 == index2) + assert(cost1 ~== cost2 relTol 1E-10) + } + } + } + + test("compute statistics distributedly") { + Seq(DistanceMeasure.COSINE, DistanceMeasure.EUCLIDEAN).foreach { distanceMeasure => + val distance = DistanceMeasure.decodeFromString(distanceMeasure) + val statistics1 = distance.computeStatistics(centers) + val sc = spark.sparkContext + val bcCenters = sc.broadcast(centers) + val statistics2 = distance.computeStatisticsDistributedly(sc, bcCenters) + bcCenters.destroy() + assert(Vectors.dense(statistics1) ~== Vectors.dense(statistics2) relTol 1E-10) + } + } +} From bb4539bee84aa5832771cba45785cfe6b49f95a0 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 20 Apr 2020 17:42:19 +0800 Subject: [PATCH 7/8] nit --- .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index ef3101b0812d..04a3b6dd413b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -90,10 +90,10 @@ class KMeansModel (@Since("1.0.0") val clusterCenters: Array[Vector], @Since("1.0.0") def predict(points: RDD[Vector]): RDD[Int] = { val bcCentersWithNorm = points.context.broadcast(clusterCentersWithNorm) - val bcRadii = points.context.broadcast(statistics) + val bcStatistics = points.context.broadcast(statistics) points.map(p => distanceMeasureInstance.findClosest(bcCentersWithNorm.value, - bcRadii.value, new VectorWithNorm(p))._1) + bcStatistics.value, new VectorWithNorm(p))._1) } /** From d31d488e0e48a82fd5b43c406f07b8c7d27dd53c Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 21 Apr 2020 14:58:07 +0800 Subject: [PATCH 8/8] adress comments --- .../mllib/clustering/DistanceMeasure.scala | 71 +++++++++---------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index 11b2e4380f97..bffed61c291e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -36,13 +36,14 @@ private[spark] abstract class DistanceMeasure extends Serializable { /** * Statistics used in triangle inequality to obtain useful bounds to find closest centers. * - * @return The upper triangular part of a symmetric matrix containing statistics, matrix(i)(j) - * represents: - * 1, a lower bound r of the center i, if i==j. If distance between point x and center i - * is less than f(r), then center i is the closest center to point x. - * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. - * Given point x, let i be current closest center, and d be current best distance, - * if d < f(r), then we no longer need to compute the distance to center j. + * @return The packed upper triangular part of a symmetric matrix containing statistics, + * matrix(i,j) represents: + * 1, if i != j: a bound r = matrix(i,j) to help avoiding unnecessary distance + * computation. Given point x, let i be current closest center, and d be current best + * distance, if d < f(r), then we no longer need to compute the distance to center j; + * 2, if i == j: a bound r = matrix(i,i) = min_k{maxtrix(i,k)|k!=i}. If distance + * between point x and center i is less than f(r), then center i is the closest center + * to point x. */ def computeStatistics(centers: Array[VectorWithNorm]): Array[Double] = { val k = centers.length @@ -261,14 +262,15 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { * @see Charles Elkan, * Using the Triangle Inequality to Accelerate k-Means * - * @return One element used in statistics matrix to make matrix(i)(j) represents: - * 1, squared radii of the center i, if i==j. If distance between point x and center i - * is less than the radius of center i, then center i is the closest center to point x. - * For Euclidean distance, radius of center i is half of the distance between center i - * and its closest center; - * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. - * Given point x, let i be current closest center, and d be current best squared - * distance, if d < r, then we no longer need to compute the distance to center j. + * @return One element used in statistics matrix to make matrix(i,j) represents: + * 1, if i != j: a bound r = matrix(i,j) to help avoiding unnecessary distance + * computation. Given point x, let i be current closest center, and d be current best + * squared distance, if d < r, then we no longer need to compute the distance to center + * j. matrix(i,j) equals to squared of half of Euclidean distance between centers i + * and j; + * 2, if i == j: a bound r = matrix(i,i) = min_k{maxtrix(i,k)|k!=i}. If squared + * distance between point x and center i is less than r, then center i is the closest + * center to point x. */ override def computeStatistics(distance: Double): Double = { 0.25 * distance * distance @@ -282,9 +284,7 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { statistics: Array[Double], point: VectorWithNorm): (Int, Double) = { var bestDistance = EuclideanDistanceMeasure.fastSquaredDistance(centers(0), point) - if (bestDistance < statistics(0)) { - return (0, bestDistance) - } + if (bestDistance < statistics(0)) return (0, bestDistance) val k = centers.length var bestIndex = 0 @@ -300,9 +300,8 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { if (statistics(index1) < bestDistance) { val d = EuclideanDistanceMeasure.fastSquaredDistance(center, point) val index2 = indexUpperTriangular(k, i, i) - if (d < statistics(index2)) { - return (i, d) - } else if (d < bestDistance) { + if (d < statistics(index2)) return (i, d) + if (d < bestDistance) { bestDistance = d bestIndex = i } @@ -398,16 +397,17 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { /** * Statistics used in triangle inequality to obtain useful bounds to find closest centers. * - * @return One element used in statistics matrix to make matrix(i)(j) represents: - * 1, squared radii of the center i, if i==j. If distance between point x and center i - * is less than the radius of center i, then center i is the closest center to point x. - * For Cosine distance, it is similar to Euclidean distance. However, here radian/angle - * is used instead of Cosine distance: for center c, finding its closest center, - * computing the radian/angle between them, halving it, and converting it back to Cosine - * distance at the end. - * 2, a lower bound r=matrix(i)(j) to help avoiding unnecessary distance computation. - * Given point x, let i be current closest center, and d be current best squared - * distance, if d < r, then we no longer need to compute the distance to center j. + * @return One element used in statistics matrix to make matrix(i,j) represents: + * 1, if i != j: a bound r = matrix(i,j) to help avoiding unnecessary distance + * computation. Given point x, let i be current closest center, and d be current best + * squared distance, if d < r, then we no longer need to compute the distance to center + * j. For Cosine distance, it is similar to Euclidean distance. However, radian/angle + * is used instead of Cosine distance to compute matrix(i,j): for centers i and j, + * compute the radian/angle between them, halving it, and converting it back to Cosine + * distance at the end; + * 2, if i == j: a bound r = matrix(i,i) = min_k{maxtrix(i,k)|k!=i}. If Cosine + * distance between point x and center i is less than r, then center i is the closest + * center to point x. */ override def computeStatistics(distance: Double): Double = { // d = 1 - cos(x) @@ -423,9 +423,7 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { statistics: Array[Double], point: VectorWithNorm): (Int, Double) = { var bestDistance = distance(centers(0), point) - if (bestDistance < statistics(0)) { - return (0, bestDistance) - } + if (bestDistance < statistics(0)) return (0, bestDistance) val k = centers.length var bestIndex = 0 @@ -436,9 +434,8 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { val center = centers(i) val d = distance(center, point) val index2 = indexUpperTriangular(k, i, i) - if (d < statistics(index2)) { - return (i, d) - } else if (d < bestDistance) { + if (d < statistics(index2)) return (i, d) + if (d < bestDistance) { bestDistance = d bestIndex = i }