From 7ac9981646cc6b56cf2ee7506008df584ba3e066 Mon Sep 17 00:00:00 2001 From: lizhengbing Date: Thu, 1 Jan 2015 19:44:18 +0800 Subject: [PATCH 1/7] test git --- testlzb | Bin 0 -> 14 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 testlzb diff --git a/testlzb b/testlzb new file mode 100644 index 0000000000000000000000000000000000000000..25b690689b298649c027af668c051282a96eed6c GIT binary patch literal 14 VcmezWuY@6$p_rkBftP`c0RSrC1JwWk literal 0 HcmV?d00001 From cbca8b7a0791335a352462e39656cb9f3e9e80e2 Mon Sep 17 00:00:00 2001 From: lizhengbing Date: Thu, 1 Jan 2015 19:47:42 +0800 Subject: [PATCH 2/7] Update testlzb just for test --- testlzb | Bin 14 -> 12 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/testlzb b/testlzb index 25b690689b298649c027af668c051282a96eed6c..8edb37e36dfbc0094246c945074c1f0a41d9047d 100644 GIT binary patch literal 12 TcmYdHO6E#QP2)<>$m9Y587~8@ literal 14 VcmezWuY@6$p_rkBftP`c0RSrC1JwWk From 5d54771ff0e06e800cc6292741d345ced102ddfa Mon Sep 17 00:00:00 2001 From: lizhengbing Date: Thu, 1 Jan 2015 20:06:11 +0800 Subject: [PATCH 3/7] test lzb has been delete --- testlzb | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 testlzb diff --git a/testlzb b/testlzb deleted file mode 100644 index 8edb37e36dfb..000000000000 --- a/testlzb +++ /dev/null @@ -1,3 +0,0 @@ -abc -def -ghi From 35aeeccee6d636b2d4e220d9274f9ce3a2e474f4 Mon Sep 17 00:00:00 2001 From: lizhengbing Date: Thu, 1 Jan 2015 20:54:57 +0800 Subject: [PATCH 4/7] add testmastet.txt --- testmastet.txt | Bin 0 -> 42 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 testmastet.txt diff --git a/testmastet.txt b/testmastet.txt new file mode 100644 index 0000000000000000000000000000000000000000..07c40969a4154d57f2b67d09709c799dde99296d GIT binary patch literal 42 icmezWuY@6$p_rkBA(tT$Os6swF(?2bGM|@$iva-m Date: Thu, 1 Jan 2015 21:05:15 +0800 Subject: [PATCH 5/7] delete testmastet.txt --- testmastet.txt | Bin 42 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 testmastet.txt diff --git a/testmastet.txt b/testmastet.txt deleted file mode 100644 index 07c40969a4154d57f2b67d09709c799dde99296d..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 42 icmezWuY@6$p_rkBA(tT$Os6swF(?2bGM|@$iva-m Date: Sun, 4 Jan 2015 21:52:14 +0800 Subject: [PATCH 6/7] add SVMWithLBFGS and corresponding test case in mllib --- .../spark/mllib/classification/SVM.scala | 16 ++ .../spark/mllib/classification/SVMSuite.scala | 153 +++++++++++++++++- 2 files changed, 165 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index dd514ff8a37f..f79954599355 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -190,3 +190,19 @@ object SVMWithSGD { train(input, numIterations, 1.0, 0.01, 1.0) } } + +/** + * Train a classification model for SVM using Limited-memory BFGS. + * NOTE: Labels used in SVM should be {0, 1} + */ +class SVMWithLBFGS + extends GeneralizedLinearAlgorithm[SVMModel] with Serializable { + + override val optimizer = new LBFGS(new HingeGradient(), new SquaredL2Updater()) + + override protected val validators = List(DataValidators.binaryLabelValidator) + + override protected def createModel(weights: Vector, intercept: Double) = { + new SVMModel(weights, intercept) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala index a2de7fbd4138..3306cb34dc28 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -68,7 +68,7 @@ class SVMSuite extends FunSuite with MLlibTestSparkContext { assert(numOffPredictions < input.length / 5) } - test("SVM with threshold") { + test("SVM with threshold using local random SGD") { val nPoints = 10000 // NOTE: Intercept should be small for generating equal 0s and 1s @@ -104,6 +104,44 @@ class SVMSuite extends FunSuite with MLlibTestSparkContext { predictions = model.predict(validationRDD.map(_.features)).collect() assert(predictions.count(_ == 1.0) == predictions.length) } + + test("SVM with threshold using local LBFGS") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + + val svm = new SVMWithLBFGS().setIntercept(true) + svm.optimizer.setNumIterations(100) + + val model = svm.run(testRDD) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + + var predictions = model.predict(validationRDD.map(_.features)).collect() + assert(predictions.count(_ == 0.0) != predictions.length) + + // High threshold makes all the predictions 0.0 + model.setThreshold(10000.0) + predictions = model.predict(validationRDD.map(_.features)).collect() + assert(predictions.count(_ == 0.0) == predictions.length) + + // Low threshold makes all the predictions 1.0 + model.setThreshold(-10000.0) + predictions = model.predict(validationRDD.map(_.features)).collect() + assert(predictions.count(_ == 1.0) == predictions.length) + } test("SVM using local random SGD") { val nPoints = 10000 @@ -132,8 +170,36 @@ class SVMSuite extends FunSuite with MLlibTestSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } + + test("SVM using local LBFGS") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithLBFGS().setIntercept(true) + svm.optimizer.setNumIterations(100) + + val model = svm.run(testRDD) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } - test("SVM local random SGD with initial weights") { + test("SVM with initial weights using local random SGD") { val nPoints = 10000 // NOTE: Intercept should be small for generating equal 0s and 1s @@ -164,8 +230,40 @@ class SVMSuite extends FunSuite with MLlibTestSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } + + test("SVM with initial weights using LBFGS") { + val nPoints = 10000 - test("SVM with invalid labels") { + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Vectors.dense(initialB, initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithLBFGS().setIntercept(true) + svm.optimizer.setNumIterations(100) + + val model = svm.run(testRDD, initialWeights) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM with invalid labels using local random SGD") { val nPoints = 10000 // NOTE: Intercept should be small for generating equal 0s and 1s @@ -191,11 +289,41 @@ class SVMSuite extends FunSuite with MLlibTestSparkContext { // Turning off data validation should not throw an exception new SVMWithSGD().setValidateData(false).run(testRDDInvalid) } + + test("SVM with invalid labels using LBFGS") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + + val testRDDInvalid = testRDD.map { lp => + if (lp.label == 0.0) { + LabeledPoint(-1.0, lp.features) + } else { + lp + } + } + + intercept[SparkException] { + val svm = new SVMWithLBFGS() + svm.optimizer.setNumIterations(100) + svm.run(testRDDInvalid) + } + + // Turning off data validation should not throw an exception + new SVMWithLBFGS().setValidateData(false).run(testRDDInvalid) + } + } class SVMClusterSuite extends FunSuite with LocalClusterSparkContext { - test("task size should be small in both training and prediction") { + test("task size should be small in both training and prediction using random SGD") { val m = 4 val n = 200000 val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => @@ -207,4 +335,21 @@ class SVMClusterSuite extends FunSuite with LocalClusterSparkContext { val model = SVMWithSGD.train(points, 2) val predictions = model.predict(points.map(_.features)) } + + + test("task size should be small in both training and prediction using LBFGS") { + val m = 4 + val n = 200000 + val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => + val random = new Random(idx) + iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble())))) + }.cache() + // If we serialize data directly in the task closure, the size of the serialized task would be + // greater than 1MB and hence Spark would throw an error. + val svm = new SVMWithLBFGS() + svm.optimizer.setNumIterations(2) + val model = svm.run(points) + val predictions = model.predict(points.map(_.features)) + } + } From 202c34ed0c97b8708d173c67f79d2060ff1b6ca7 Mon Sep 17 00:00:00 2001 From: lizhengbing Date: Sun, 22 Mar 2015 21:21:49 +0800 Subject: [PATCH 7/7] A change --- .../spark/mllib/classification/SVMSuite.scala | 246 ++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala new file mode 100644 index 000000000000..6de098b383ba --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.jblas.DoubleMatrix +import org.scalatest.FunSuite + +import org.apache.spark.SparkException +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.util.Utils + +object SVMSuite { + + def generateSVMInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed)) + } + + // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) + def generateSVMInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextDouble() * 2.0 - 1.0)) + val y = x.map { xi => + val yD = new DoubleMatrix(1, xi.length, xi: _*).dot(weightsMat) + + intercept + 0.01 * rnd.nextGaussian() + if (yD < 0) 0.0 else 1.0 + } + y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2))) + } + + /** Binary labels, 3 features */ + private val binaryModel = new SVMModel(weights = Vectors.dense(0.1, 0.2, 0.3), intercept = 0.5) + +} + +class SVMSuite extends FunSuite with MLlibTestSparkContext { + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => + prediction != expected.label + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("SVM with threshold") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD().setIntercept(true) + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + + var predictions = model.predict(validationRDD.map(_.features)).collect() + assert(predictions.count(_ == 0.0) != predictions.length) + + // High threshold makes all the predictions 0.0 + model.setThreshold(10000.0) + predictions = model.predict(validationRDD.map(_.features)).collect() + assert(predictions.count(_ == 0.0) == predictions.length) + + // Low threshold makes all the predictions 1.0 + model.setThreshold(-10000.0) + predictions = model.predict(validationRDD.map(_.features)).collect() + assert(predictions.count(_ == 1.0) == predictions.length) + } + + test("SVM using local random SGD") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD().setIntercept(true) + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM local random SGD with initial weights") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Vectors.dense(initialB, initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD().setIntercept(true) + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD, initialWeights) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM with invalid labels") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + + val testRDDInvalid = testRDD.map { lp => + if (lp.label == 0.0) { + LabeledPoint(-1.0, lp.features) + } else { + lp + } + } + + intercept[SparkException] { + SVMWithSGD.train(testRDDInvalid, 100) + } + + // Turning off data validation should not throw an exception + new SVMWithSGD().setValidateData(false).run(testRDDInvalid) + } + + test("model save/load") { + // NOTE: This will need to be generalized once there are multiple model format versions. + val model = SVMSuite.binaryModel + + model.clearThreshold() + assert(model.getThreshold.isEmpty) + + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + // Save model, load it back, and compare. + try { + model.save(sc, path) + val sameModel = SVMModel.load(sc, path) + assert(model.weights == sameModel.weights) + assert(model.intercept == sameModel.intercept) + assert(sameModel.getThreshold.isEmpty) + } finally { + Utils.deleteRecursively(tempDir) + } + + // Save model with threshold. + try { + model.setThreshold(0.7) + model.save(sc, path) + val sameModel2 = SVMModel.load(sc, path) + assert(model.getThreshold.get == sameModel2.getThreshold.get) + } finally { + Utils.deleteRecursively(tempDir) + } + } +} + +class SVMClusterSuite extends FunSuite with LocalClusterSparkContext { + + test("task size should be small in both training and prediction") { + val m = 4 + val n = 200000 + val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => + val random = new Random(idx) + iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble())))) + }.cache() + // If we serialize data directly in the task closure, the size of the serialized task would be + // greater than 1MB and hence Spark would throw an error. + val model = SVMWithSGD.train(points, 2) + val predictions = model.predict(points.map(_.features)) + } +}