Skip to content

Commit 7a4190a

Browse files
committed
Merge branch 'master' of github.com:apache/spark into handle-configs-bash
2 parents 7396be2 + 2e069ca commit 7a4190a

File tree

6 files changed

+75
-131
lines changed

6 files changed

+75
-131
lines changed

examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.log4j.{Level, Logger}
2121
import scopt.OptionParser
2222

2323
import org.apache.spark.{SparkConf, SparkContext}
24-
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
24+
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, SVMWithSGD}
2525
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
2626
import org.apache.spark.mllib.util.MLUtils
2727
import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater}
@@ -66,7 +66,8 @@ object BinaryClassification {
6666
.text("number of iterations")
6767
.action((x, c) => c.copy(numIterations = x))
6868
opt[Double]("stepSize")
69-
.text(s"initial step size, default: ${defaultParams.stepSize}")
69+
.text("initial step size (ignored by logistic regression), " +
70+
s"default: ${defaultParams.stepSize}")
7071
.action((x, c) => c.copy(stepSize = x))
7172
opt[String]("algorithm")
7273
.text(s"algorithm (${Algorithm.values.mkString(",")}), " +
@@ -125,10 +126,9 @@ object BinaryClassification {
125126

126127
val model = params.algorithm match {
127128
case LR =>
128-
val algorithm = new LogisticRegressionWithSGD()
129+
val algorithm = new LogisticRegressionWithLBFGS()
129130
algorithm.optimizer
130131
.setNumIterations(params.numIterations)
131-
.setStepSize(params.stepSize)
132132
.setUpdater(updater)
133133
.setRegParam(params.regParam)
134134
algorithm.run(training).clearThreshold()

mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class LogisticRegressionModel (
7373
/**
7474
* Train a classification model for Logistic Regression using Stochastic Gradient Descent.
7575
* NOTE: Labels used in Logistic Regression should be {0, 1}
76+
*
77+
* Using [[LogisticRegressionWithLBFGS]] is recommended over this.
7678
*/
7779
class LogisticRegressionWithSGD private (
7880
private var stepSize: Double,
@@ -191,51 +193,19 @@ object LogisticRegressionWithSGD {
191193

192194
/**
193195
* Train a classification model for Logistic Regression using Limited-memory BFGS.
196+
* Standard feature scaling and L2 regularization are used by default.
194197
* NOTE: Labels used in Logistic Regression should be {0, 1}
195198
*/
196-
class LogisticRegressionWithLBFGS private (
197-
private var convergenceTol: Double,
198-
private var maxNumIterations: Int,
199-
private var regParam: Double)
199+
class LogisticRegressionWithLBFGS
200200
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
201201

202-
/**
203-
* Construct a LogisticRegression object with default parameters
204-
*/
205-
def this() = this(1E-4, 100, 0.0)
206-
207202
this.setFeatureScaling(true)
208203

209-
private val gradient = new LogisticGradient()
210-
private val updater = new SimpleUpdater()
211-
// Have to return new LBFGS object every time since users can reset the parameters anytime.
212-
override def optimizer = new LBFGS(gradient, updater)
213-
.setNumCorrections(10)
214-
.setConvergenceTol(convergenceTol)
215-
.setMaxNumIterations(maxNumIterations)
216-
.setRegParam(regParam)
204+
override val optimizer = new LBFGS(new LogisticGradient, new SquaredL2Updater)
217205

218206
override protected val validators = List(DataValidators.binaryLabelValidator)
219207

220-
/**
221-
* Set the convergence tolerance of iterations for L-BFGS. Default 1E-4.
222-
* Smaller value will lead to higher accuracy with the cost of more iterations.
223-
*/
224-
def setConvergenceTol(convergenceTol: Double): this.type = {
225-
this.convergenceTol = convergenceTol
226-
this
227-
}
228-
229-
/**
230-
* Set the maximal number of iterations for L-BFGS. Default 100.
231-
*/
232-
def setNumIterations(numIterations: Int): this.type = {
233-
this.maxNumIterations = numIterations
234-
this
235-
}
236-
237208
override protected def createModel(weights: Vector, intercept: Double) = {
238209
new LogisticRegressionModel(weights, intercept)
239210
}
240-
241211
}

mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,17 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
6969

7070
/**
7171
* Set the maximal number of iterations for L-BFGS. Default 100.
72+
* @deprecated use [[LBFGS#setNumIterations]] instead
7273
*/
74+
@deprecated("use setNumIterations instead", "1.1.0")
7375
def setMaxNumIterations(iters: Int): this.type = {
76+
this.setNumIterations(iters)
77+
}
78+
79+
/**
80+
* Set the maximal number of iterations for L-BFGS. Default 100.
81+
*/
82+
def setNumIterations(iters: Int): this.type = {
7483
this.maxNumIterations = iters
7584
this
7685
}

mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala

Lines changed: 42 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import org.apache.spark.{Logging, HashPartitioner}
22+
import org.apache.spark.Logging
2323
import org.apache.spark.SparkContext._
24-
import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
25-
import org.apache.spark.rdd.{CoGroupedRDD, RDD}
24+
import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
25+
import org.apache.spark.rdd.RDD
2626

2727
/**
2828
* Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix
@@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
4343
/**
4444
* Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the
4545
* correlation between column i and j.
46-
*
47-
* Input RDD[Vector] should be cached or checkpointed if possible since it would be split into
48-
* numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector].
4946
*/
5047
override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
51-
val indexed = X.zipWithUniqueId()
52-
53-
val numCols = X.first.size
54-
if (numCols > 50) {
55-
logWarning("Computing the Spearman correlation matrix can be slow for large RDDs with more"
56-
+ " than 50 columns.")
57-
}
58-
val ranks = new Array[RDD[(Long, Double)]](numCols)
59-
60-
// Note: we use a for loop here instead of a while loop with a single index variable
61-
// to avoid race condition caused by closure serialization
62-
for (k <- 0 until numCols) {
63-
val column = indexed.map { case (vector, index) => (vector(k), index) }
64-
ranks(k) = getRanks(column)
48+
// ((columnIndex, value), rowUid)
49+
val colBased = X.zipWithUniqueId().flatMap { case (vec, uid) =>
50+
vec.toArray.view.zipWithIndex.map { case (v, j) =>
51+
((j, v), uid)
52+
}
6553
}
66-
67-
val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
68-
PearsonCorrelation.computeCorrelationMatrix(ranksMat)
69-
}
70-
71-
/**
72-
* Compute the ranks for elements in the input RDD, using the average method for ties.
73-
*
74-
* With the average method, elements with the same value receive the same rank that's computed
75-
* by taking the average of their positions in the sorted list.
76-
* e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
77-
* Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for
78-
* ranks in the standard definition for Spearman's correlation. This does not affect the final
79-
* results and is slightly more performant.
80-
*
81-
* @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId)
82-
* @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is
83-
* copied from the input RDD.
84-
*/
85-
private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = {
86-
// Get elements' positions in the sorted list for computing average rank for duplicate values
87-
val sorted = indexed.sortByKey().zipWithIndex()
88-
89-
val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
90-
// add an extra element to signify the end of the list so that flatMap can flush the last
91-
// batch of duplicates
92-
val end = -1L
93-
val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, end), end))
94-
val firstEntry = padded.next()
95-
var lastVal = firstEntry._1._1
96-
var firstRank = firstEntry._2.toDouble
97-
val idBuffer = ArrayBuffer(firstEntry._1._2)
98-
padded.flatMap { case ((v, id), rank) =>
99-
if (v == lastVal && id != end) {
100-
idBuffer += id
101-
Iterator.empty
102-
} else {
103-
val entries = if (idBuffer.size == 1) {
104-
Iterator((idBuffer(0), firstRank))
105-
} else {
106-
val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
107-
idBuffer.map(id => (id, averageRank))
108-
}
109-
lastVal = v
110-
firstRank = rank
111-
idBuffer.clear()
112-
idBuffer += id
113-
entries
54+
// global sort by (columnIndex, value)
55+
val sorted = colBased.sortByKey()
56+
// assign global ranks (using average ranks for tied values)
57+
val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
58+
var preCol = -1
59+
var preVal = Double.NaN
60+
var startRank = -1.0
61+
var cachedUids = ArrayBuffer.empty[Long]
62+
val flush: () => Iterable[(Long, (Int, Double))] = () => {
63+
val averageRank = startRank + (cachedUids.size - 1) / 2.0
64+
val output = cachedUids.map { uid =>
65+
(uid, (preCol, averageRank))
11466
}
67+
cachedUids.clear()
68+
output
11569
}
70+
iter.flatMap { case (((j, v), uid), rank) =>
71+
// If we see a new value or cachedUids is too big, we flush ids with their average rank.
72+
if (j != preCol || v != preVal || cachedUids.size >= 10000000) {
73+
val output = flush()
74+
preCol = j
75+
preVal = v
76+
startRank = rank
77+
cachedUids += uid
78+
output
79+
} else {
80+
cachedUids += uid
81+
Iterator.empty
82+
}
83+
} ++ flush()
11684
}
117-
ranks
118-
}
119-
120-
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
121-
val partitioner = new HashPartitioner(input.partitions.size)
122-
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
123-
cogrouped.map {
124-
case (_, values: Array[Iterable[_]]) =>
125-
val doubles = values.asInstanceOf[Array[Iterable[Double]]]
126-
new DenseVector(doubles.flatten.toArray)
85+
// Replace values in the input matrix by their ranks compared with values in the same column.
86+
// Note that shifting all ranks in a column by a constant value doesn't affect result.
87+
val groupedRanks = globalRanks.groupByKey().map { case (uid, iter) =>
88+
// sort by column index and then convert values to a vector
89+
Vectors.dense(iter.toSeq.sortBy(_._1).map(_._2).toArray)
12790
}
91+
PearsonCorrelation.computeCorrelationMatrix(groupedRanks)
12892
}
12993
}

mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,9 @@ class LogisticRegressionClusterSuite extends FunSuite with LocalClusterSparkCont
272272
}.cache()
273273
// If we serialize data directly in the task closure, the size of the serialized task would be
274274
// greater than 1MB and hence Spark would throw an error.
275-
val model =
276-
(new LogisticRegressionWithLBFGS().setIntercept(true).setNumIterations(2)).run(points)
275+
val lr = new LogisticRegressionWithLBFGS().setIntercept(true)
276+
lr.optimizer.setNumIterations(2)
277+
val model = lr.run(points)
277278

278279
val predictions = model.predict(points.map(_.features))
279280

mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
5555

5656
val initialWeightsWithIntercept = Vectors.dense(1.0 +: initialWeights.toArray)
5757
val convergenceTol = 1e-12
58-
val maxNumIterations = 10
58+
val numIterations = 10
5959

6060
val (_, loss) = LBFGS.runLBFGS(
6161
dataRDD,
6262
gradient,
6363
simpleUpdater,
6464
numCorrections,
6565
convergenceTol,
66-
maxNumIterations,
66+
numIterations,
6767
regParam,
6868
initialWeightsWithIntercept)
6969

@@ -99,15 +99,15 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
9999
// Prepare another non-zero weights to compare the loss in the first iteration.
100100
val initialWeightsWithIntercept = Vectors.dense(0.3, 0.12)
101101
val convergenceTol = 1e-12
102-
val maxNumIterations = 10
102+
val numIterations = 10
103103

104104
val (weightLBFGS, lossLBFGS) = LBFGS.runLBFGS(
105105
dataRDD,
106106
gradient,
107107
squaredL2Updater,
108108
numCorrections,
109109
convergenceTol,
110-
maxNumIterations,
110+
numIterations,
111111
regParam,
112112
initialWeightsWithIntercept)
113113

@@ -140,10 +140,10 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
140140

141141
/**
142142
* For the first run, we set the convergenceTol to 0.0, so that the algorithm will
143-
* run up to the maxNumIterations which is 8 here.
143+
* run up to the numIterations which is 8 here.
144144
*/
145145
val initialWeightsWithIntercept = Vectors.dense(0.0, 0.0)
146-
val maxNumIterations = 8
146+
val numIterations = 8
147147
var convergenceTol = 0.0
148148

149149
val (_, lossLBFGS1) = LBFGS.runLBFGS(
@@ -152,7 +152,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
152152
squaredL2Updater,
153153
numCorrections,
154154
convergenceTol,
155-
maxNumIterations,
155+
numIterations,
156156
regParam,
157157
initialWeightsWithIntercept)
158158

@@ -167,7 +167,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
167167
squaredL2Updater,
168168
numCorrections,
169169
convergenceTol,
170-
maxNumIterations,
170+
numIterations,
171171
regParam,
172172
initialWeightsWithIntercept)
173173

@@ -182,7 +182,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
182182
squaredL2Updater,
183183
numCorrections,
184184
convergenceTol,
185-
maxNumIterations,
185+
numIterations,
186186
regParam,
187187
initialWeightsWithIntercept)
188188

@@ -200,12 +200,12 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
200200
// Prepare another non-zero weights to compare the loss in the first iteration.
201201
val initialWeightsWithIntercept = Vectors.dense(0.3, 0.12)
202202
val convergenceTol = 1e-12
203-
val maxNumIterations = 10
203+
val numIterations = 10
204204

205205
val lbfgsOptimizer = new LBFGS(gradient, squaredL2Updater)
206206
.setNumCorrections(numCorrections)
207207
.setConvergenceTol(convergenceTol)
208-
.setMaxNumIterations(maxNumIterations)
208+
.setNumIterations(numIterations)
209209
.setRegParam(regParam)
210210

211211
val weightLBFGS = lbfgsOptimizer.optimize(dataRDD, initialWeightsWithIntercept)
@@ -241,7 +241,7 @@ class LBFGSClusterSuite extends FunSuite with LocalClusterSparkContext {
241241
val lbfgs = new LBFGS(new LogisticGradient, new SquaredL2Updater)
242242
.setNumCorrections(1)
243243
.setConvergenceTol(1e-12)
244-
.setMaxNumIterations(1)
244+
.setNumIterations(1)
245245
.setRegParam(1.0)
246246
val random = new Random(0)
247247
// If we serialize data directly in the task closure, the size of the serialized task would be

0 commit comments

Comments
 (0)