Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ class NaiveBayesModel private[mllib] (
}
}

override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
override def predict(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predict)
}
}

override def predict(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,21 @@ class KMeans private (
val activeCenters = activeRuns.map(r => centers(r)).toArray
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))

val bcActiveCenters = sc.broadcast(activeCenters)

// Find the sum and count of points mapping to each center
val totalContribs = data.mapPartitions { points =>
val runs = activeCenters.length
val k = activeCenters(0).length
val dims = activeCenters(0)(0).vector.length
val thisActiveCenters = bcActiveCenters.value
val runs = thisActiveCenters.length
val k = thisActiveCenters(0).length
val dims = thisActiveCenters(0)(0).vector.length

val sums = Array.fill(runs, k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]])
val counts = Array.fill(runs, k)(0L)

points.foreach { point =>
(0 until runs).foreach { i =>
val (bestCenter, cost) = KMeans.findClosest(activeCenters(i), point)
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
costAccums(i) += cost
sums(i)(bestCenter) += point.vector
counts(i)(bestCenter) += 1
Expand Down Expand Up @@ -264,16 +267,17 @@ class KMeans private (
// to their squared distance from that run's current centers
var step = 0
while (step < initializationSteps) {
val bcCenters = data.context.broadcast(centers)
val sumCosts = data.flatMap { point =>
(0 until runs).map { r =>
(r, KMeans.pointCost(centers(r), point))
(r, KMeans.pointCost(bcCenters.value(r), point))
}
}.reduceByKey(_ + _).collectAsMap()
val chosen = data.mapPartitionsWithIndex { (index, points) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
points.flatMap { p =>
(0 until runs).filter { r =>
rand.nextDouble() < 2.0 * KMeans.pointCost(centers(r), p) * k / sumCosts(r)
rand.nextDouble() < 2.0 * KMeans.pointCost(bcCenters.value(r), p) * k / sumCosts(r)
}.map((_, p))
}
}.collect()
Expand All @@ -286,9 +290,10 @@ class KMeans private (
// Finally, we might have a set of more than k candidate centers for each run; weigh each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick just k of them
val bcCenters = data.context.broadcast(centers)
val weightMap = data.flatMap { p =>
(0 until runs).map { r =>
((r, KMeans.findClosest(centers(r), p)._1), 1.0)
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
}
}.reduceByKey(_ + _).collectAsMap()
val finalCenters = (0 until runs).map { r =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Ser
/** Maps given points to their cluster indices. */
def predict(points: RDD[Vector]): RDD[Int] = {
val centersWithNorm = clusterCentersWithNorm
points.map(p => KMeans.findClosest(centersWithNorm, new BreezeVectorWithNorm(p))._1)
val bcCentersWithNorm = points.context.broadcast(centersWithNorm)
points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))._1)
}

/** Maps given points to their cluster indices. */
Expand All @@ -51,7 +52,8 @@ class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Ser
*/
def computeCost(data: RDD[Vector]): Double = {
val centersWithNorm = clusterCentersWithNorm
data.map(p => KMeans.pointCost(centersWithNorm, new BreezeVectorWithNorm(p))).sum()
val bcCentersWithNorm = data.context.broadcast(centersWithNorm)
data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))).sum()
}

private def clusterCentersWithNorm: Iterable[BreezeVectorWithNorm] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ object GradientDescent extends Logging {

// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size

/**
* For the first iteration, the regVal will be initialized as sum of weight squares
Expand All @@ -172,12 +173,13 @@ object GradientDescent extends Logging {
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.aggregate((BDV.zeros[Double](weights.size), 0.0))(
.aggregate((BDV.zeros[Double](n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad))
val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,14 @@ object LBFGS extends Logging {

override def calculate(weights: BDV[Double]) = {
// Have a local copy to avoid the serialization of CostFun object which is not serializable.
val localData = data
val localGradient = gradient
val n = weights.length
val bcWeights = data.context.broadcast(weights)

val (gradientSum, lossSum) = localData.aggregate((BDV.zeros[Double](weights.size), 0.0))(
val (gradientSum, lossSum) = data.aggregate((BDV.zeros[Double](n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, Vectors.fromBreeze(weights), Vectors.fromBreeze(grad))
features, label, Vectors.fromBreeze(bcWeights.value), Vectors.fromBreeze(grad))
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double
// A small optimization to avoid serializing the entire model. Only the weightsMatrix
// and intercept is needed.
val localWeights = weights
val bcWeights = testData.context.broadcast(localWeights)
val localIntercept = intercept

testData.map(v => predictPoint(v, localWeights, localIntercept))
testData.mapPartitions { iter =>
val w = bcWeights.value
iter.map(v => predictPoint(v, w, localIntercept))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ public void runLRUsingStaticMethods() {
testRDD.rdd(), 100, 1.0, 1.0);

int numAccurate = validatePrediction(validationData, model);
System.out.println(numAccurate);
Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.scalatest.Matchers

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}

object LogisticRegressionSuite {

Expand Down Expand Up @@ -126,3 +126,19 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Match
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
}

class LogisticRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {

test("task size should be small in both training and prediction") {
val m = 4
val n = 200000
val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
val random = new Random(idx)
iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
}.cache()
// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = LogisticRegressionWithSGD.train(points, 2)
val predictions = model.predict(points.map(_.features))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}

object NaiveBayesSuite {

Expand Down Expand Up @@ -96,3 +96,21 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext {
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
}

class NaiveBayesClusterSuite extends FunSuite with LocalClusterSparkContext {

test("task size should be small in both training and prediction") {
val m = 10
val n = 200000
val examples = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
val random = new Random(idx)
iter.map { i =>
LabeledPoint(random.nextInt(2), Vectors.dense(Array.fill(n)(random.nextDouble())))
}
}
// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = NaiveBayes.train(examples)
val predictions = model.predict(examples.map(_.features))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

package org.apache.spark.mllib.classification

import scala.util.Random
import scala.collection.JavaConversions._

import org.scalatest.FunSuite
import scala.util.Random

import org.jblas.DoubleMatrix
import org.scalatest.FunSuite

import org.apache.spark.SparkException
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}

object SVMSuite {

Expand Down Expand Up @@ -193,3 +192,19 @@ class SVMSuite extends FunSuite with LocalSparkContext {
new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
}
}

class SVMClusterSuite extends FunSuite with LocalClusterSparkContext {

test("task size should be small in both training and prediction") {
val m = 4
val n = 200000
val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
val random = new Random(idx)
iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
}.cache()
// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = SVMWithSGD.train(points, 2)
val predictions = model.predict(points.map(_.features))
}
}
Loading