From d640d9c58cd4f3caa6eac462b947b3a891dabbda Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 6 Feb 2015 11:12:49 +0800 Subject: [PATCH 01/21] online lda initial checkin --- .../spark/examples/mllib/LDAExample.scala | 5 +- .../apache/spark/mllib/clustering/LDA.scala | 144 ++++++++++++++++-- .../spark/mllib/clustering/JavaLDASuite.java | 2 +- .../spark/mllib/clustering/LDASuite.scala | 2 +- 4 files changed, 133 insertions(+), 20 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index f4c545ad70e96..87190e9d002b6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -26,7 +26,7 @@ import scopt.OptionParser import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.mllib.clustering.LDA +import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD @@ -137,7 +137,7 @@ object LDAExample { lda.setCheckpointDir(params.checkpointDir.get) } val startTime = System.nanoTime() - val ldaModel = lda.run(corpus) + val ldaModel = lda.run(corpus).asInstanceOf[DistributedLDAModel] val elapsed = (System.nanoTime() - startTime) / 1e9 println(s"Finished training LDA model. Summary:") @@ -159,6 +159,7 @@ object LDAExample { } println() } + sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index d8f82867a09d2..74183864de6ff 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -19,7 +19,9 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector => BDV, normalize, axpy => brzAxpy} +import breeze.linalg.{DenseVector => BDV, normalize, kron, sum, axpy => brzAxpy, DenseMatrix => BDM} +import breeze.numerics.{exp, abs, digamma} +import breeze.stats.distributions.Gamma import org.apache.spark.Logging import org.apache.spark.annotation.Experimental @@ -27,7 +29,7 @@ import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vector, DenseVector, SparseVector, Matrices} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -250,6 +252,10 @@ class LDA private ( this } + object LDAMode extends Enumeration { + val EM, Online = Value + } + /** * Learn an LDA model using the given dataset. * @@ -259,24 +265,39 @@ class LDA private ( * Document IDs must be unique and >= 0. * @return Inferred LDA model */ - def run(documents: RDD[(Long, Vector)]): DistributedLDAModel = { - val state = LDA.initialState(documents, k, getDocConcentration, getTopicConcentration, seed, - checkpointDir, checkpointInterval) - var iter = 0 - val iterationTimes = Array.fill[Double](maxIterations)(0) - while (iter < maxIterations) { - val start = System.nanoTime() - state.next() - val elapsedSeconds = (System.nanoTime() - start) / 1e9 - iterationTimes(iter) = elapsedSeconds - iter += 1 + def run(documents: RDD[(Long, Vector)], mode: LDAMode.Value = LDAMode.EM ): LDAModel = { + mode match { + case LDAMode.EM => + val state = LDA.initialState(documents, k, getDocConcentration, getTopicConcentration, seed, + checkpointDir, checkpointInterval) + var iter = 0 + val iterationTimes = Array.fill[Double](maxIterations)(0) + while (iter < maxIterations) { + val start = System.nanoTime() + state.next() + val elapsedSeconds = (System.nanoTime() - start) / 1e9 + iterationTimes(iter) = elapsedSeconds + iter += 1 + } + state.graphCheckpointer.deleteAllCheckpoints() + new DistributedLDAModel(state, iterationTimes) + case LDAMode.Online => + //todo: delete the comment in next line + // I changed the return type to LDAModel, as DistributedLDAModel is based on Graph. + val vocabSize = documents.first._2.size + val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, vocabSize) + var iter = 0 + while (iter < onlineLDA.batchNumber) { + onlineLDA.next() + iter += 1 + } + new LocalLDAModel(Matrices.fromBreeze(onlineLDA._lambda).transpose) + case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") } - state.graphCheckpointer.deleteAllCheckpoints() - new DistributedLDAModel(state, iterationTimes) } /** Java-friendly version of [[run()]] */ - def run(documents: JavaPairRDD[java.lang.Long, Vector]): DistributedLDAModel = { + def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } } @@ -429,6 +450,97 @@ private[clustering] object LDA { } + // todo: add reference to paper and Hoffman + class OnlineLDAOptimizer( + val documents: RDD[(Long, Vector)], + val k: Int, + val vocabSize: Int) extends Serializable{ + + private val kappa = 0.5 // (0.5, 1] how quickly old information is forgotten + private val tau0 = 1024 // down weights early iterations + private val D = documents.count() + private val batchSize = if (D / 1000 > 4096) 4096 + else if (D / 1000 < 4) 4 + else D / 1000 + val batchNumber = (D/batchSize + 1).toInt + // todo: performance killer, need to be replaced + private val batches = documents.randomSplit(Array.fill[Double](batchNumber)(1.0)) + + // Initialize the variational distribution q(beta|lambda) + var _lambda = getGammaMatrix(k, vocabSize) // K * V + private var _Elogbeta = dirichlet_expectation(_lambda) // K * V + private var _expElogbeta = exp(_Elogbeta) // K * V + + private var batchCount = 0 + def next(): Unit = { + // weight of the mini-batch. + val rhot = math.pow(tau0 + batchCount, -kappa) + + var stat = BDM.zeros[Double](k, vocabSize) + stat = batches(batchCount).aggregate(stat)(seqOp, _ += _) + + stat = stat :* _expElogbeta + _lambda = _lambda * (1 - rhot) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * rhot + _Elogbeta = dirichlet_expectation(_lambda) + _expElogbeta = exp(_Elogbeta) + batchCount += 1 + } + + private def seqOp(other: BDM[Double], doc: (Long, Vector)): BDM[Double] = { + val termCounts = doc._2 + val (ids, cts) = termCounts match { + case v: DenseVector => (((0 until v.size).toList), v.values) + case v: SparseVector => (v.indices.toList, v.values) + case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + + var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K + var expElogthetad = exp(Elogthetad.t).t // 1 * K + val expElogbetad = _expElogbeta(::, ids).toDenseMatrix // K * ids + + var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids + var meanchange = 1D + val ctsVector = new BDV[Double](cts).t // 1 * ids + + while (meanchange > 1e-6) { + val lastgamma = gammad + // 1*K 1 * ids ids * k + gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k + Elogthetad = vector_dirichlet_expectation(gammad.t).t + expElogthetad = exp(Elogthetad.t).t + phinorm = expElogthetad * expElogbetad + 1e-100 + meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble + } + + val v1 = expElogthetad.t.toDenseMatrix.t + val v2 = (ctsVector / phinorm).t.toDenseMatrix + val outerResult = kron(v1, v2) // K * ids + for (i <- 0 until ids.size) { + other(::, ids(i)) := (other(::, ids(i)) + outerResult(::, i)) + } + other + } + + def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) + val temp = gammaRandomGenerator.sample(row * col).toArray + (new BDM[Double](col, row, temp)).t + } + + def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + val rowSum = sum(alpha(breeze.linalg.*, ::)) + val digAlpha = digamma(alpha) + val digRowSum = digamma(rowSum) + val result = digAlpha(::, breeze.linalg.*) - digRowSum + result + } + + def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ + digamma(v) - digamma(sum(v)) + } + } + /** * Compute gamma_{wjk}, a distribution over topics k. */ diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index dc10aa67c7c1f..fbe171b4b1ab1 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -88,7 +88,7 @@ public void distributedLDAModel() { .setMaxIterations(5) .setSeed(12345); - DistributedLDAModel model = lda.run(corpus); + DistributedLDAModel model = (DistributedLDAModel)lda.run(corpus); // Check: basic parameters LocalLDAModel localModel = model.toLocal(); diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 302d751eb8a94..d36fb9b479c67 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -68,7 +68,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { .setSeed(12345) val corpus = sc.parallelize(tinyCorpus, 2) - val model: DistributedLDAModel = lda.run(corpus) + val model: DistributedLDAModel = lda.run(corpus).asInstanceOf[DistributedLDAModel] // Check: basic parameters val localModel = model.toLocal From 26dca1bddd98203e90e3cb36de4f3d16fbfbf6cc Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 6 Feb 2015 13:09:06 +0800 Subject: [PATCH 02/21] style fix and make class private --- .../org/apache/spark/mllib/clustering/LDA.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 3c7ba2a674cae..2b2025c171f1c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -420,7 +420,7 @@ private[clustering] object LDA { } // todo: add reference to paper and Hoffman - class OnlineLDAOptimizer( + private[clustering] class OnlineLDAOptimizer( val documents: RDD[(Long, Vector)], val k: Int, val vocabSize: Int) extends Serializable{ @@ -463,12 +463,12 @@ private[clustering] object LDA { case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } - var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K - var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K - var expElogthetad = exp(Elogthetad.t).t // 1 * K - val expElogbetad = _expElogbeta(::, ids).toDenseMatrix // K * ids + var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K + var expElogthetad = exp(Elogthetad.t).t // 1 * K + val expElogbetad = _expElogbeta(::, ids).toDenseMatrix // K * ids - var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids + var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids var meanchange = 1D val ctsVector = new BDV[Double](cts).t // 1 * ids From f41c5ca2d2bb11394882d4212fd4138ae9a972a1 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 6 Feb 2015 14:23:12 +0800 Subject: [PATCH 03/21] style fix --- .../src/main/scala/org/apache/spark/mllib/clustering/LDA.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 2b2025c171f1c..919bde55a1b8c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -252,8 +252,6 @@ class LDA private ( state.graphCheckpointer.deleteAllCheckpoints() new DistributedLDAModel(state, iterationTimes) case LDAMode.Online => - //todo: delete the comment in next line - // I changed the return type to LDAModel, as DistributedLDAModel is based on Graph. val vocabSize = documents.first._2.size val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, vocabSize) var iter = 0 From 0d0f3eef6d4e2754bfa2904f30bf9e21005ae392 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 10 Feb 2015 12:30:48 +0800 Subject: [PATCH 04/21] replace random split with sliding --- .../main/scala/org/apache/spark/mllib/clustering/LDA.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 919bde55a1b8c..716cfd9e103c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -32,6 +32,7 @@ import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.{Vector, DenseVector, SparseVector, Matrices} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import org.apache.spark.mllib.rdd.RDDFunctions._ /** @@ -430,8 +431,7 @@ private[clustering] object LDA { else if (D / 1000 < 4) 4 else D / 1000 val batchNumber = (D/batchSize + 1).toInt - // todo: performance killer, need to be replaced - private val batches = documents.randomSplit(Array.fill[Double](batchNumber)(1.0)) + private val batches = documents.sliding(batchNumber).collect() // Initialize the variational distribution q(beta|lambda) var _lambda = getGammaMatrix(k, vocabSize) // K * V From 3a06526df629b8ff1291bfb1b183f5e6af45bcde Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 10 Feb 2015 13:31:32 +0800 Subject: [PATCH 05/21] merge with new example --- .../java/org/apache/spark/examples/mllib/JavaLDAExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java index f394ff2084463..75bc3dd788ac0 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java @@ -58,7 +58,7 @@ public Tuple2 call(Tuple2 doc_id) { corpus.cache(); // Cluster the documents into three topics using LDA - DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus); + DistributedLDAModel ldaModel = (DistributedLDAModel) new LDA().setK(3).run(corpus); // Output topics. Each is a distribution over words (matching word count vectors) System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize() From 581c623106f38d91497fb8123f47c4e661057071 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 2 Mar 2015 19:51:44 +0800 Subject: [PATCH 06/21] seperate API and adjust batch split --- .../spark/examples/mllib/JavaLDAExample.java | 2 +- .../spark/examples/mllib/LDAExample.scala | 2 +- .../apache/spark/mllib/clustering/LDA.scala | 126 +++++++++--------- .../spark/mllib/clustering/JavaLDASuite.java | 2 +- .../spark/mllib/clustering/LDASuite.scala | 2 +- 5 files changed, 65 insertions(+), 69 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java index c0d1a622ffad8..36207ae38d9a9 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java @@ -58,7 +58,7 @@ public Tuple2 call(Tuple2 doc_id) { corpus.cache(); // Cluster the documents into three topics using LDA - DistributedLDAModel ldaModel = (DistributedLDAModel) new LDA().setK(3).run(corpus); + DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus); // Output topics. Each is a distribution over words (matching word count vectors) System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index 0e1b27a8bd2ee..11399a7633638 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -159,7 +159,7 @@ object LDAExample { } println() } - + sc.stop() } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 1453e4dac768e..76ecdf92f26ed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -32,7 +32,6 @@ import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.{Vector, DenseVector, SparseVector, Matrices} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils -import org.apache.spark.mllib.rdd.RDDFunctions._ /** @@ -223,10 +222,6 @@ class LDA private ( this } - object LDAMode extends Enumeration { - val EM, Online = Value - } - /** * Learn an LDA model using the given dataset. * @@ -236,37 +231,30 @@ class LDA private ( * Document IDs must be unique and >= 0. * @return Inferred LDA model */ - def run(documents: RDD[(Long, Vector)], mode: LDAMode.Value = LDAMode.EM ): LDAModel = { - mode match { - case LDAMode.EM => - val state = LDA.initialState(documents, k, getDocConcentration, getTopicConcentration, seed, - checkpointInterval) - var iter = 0 - val iterationTimes = Array.fill[Double](maxIterations)(0) - while (iter < maxIterations) { - val start = System.nanoTime() - state.next() - val elapsedSeconds = (System.nanoTime() - start) / 1e9 - iterationTimes(iter) = elapsedSeconds - iter += 1 - } - state.graphCheckpointer.deleteAllCheckpoints() - new DistributedLDAModel(state, iterationTimes) - case LDAMode.Online => - val vocabSize = documents.first._2.size - val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, vocabSize) - var iter = 0 - while (iter < onlineLDA.batchNumber) { - onlineLDA.next() - iter += 1 - } - new LocalLDAModel(Matrices.fromBreeze(onlineLDA._lambda).transpose) - case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") + def run(documents: RDD[(Long, Vector)]): DistributedLDAModel = { + val state = LDA.initialState(documents, k, getDocConcentration, getTopicConcentration, seed, + checkpointInterval) + var iter = 0 + val iterationTimes = Array.fill[Double](maxIterations)(0) + while (iter < maxIterations) { + val start = System.nanoTime() + state.next() + val elapsedSeconds = (System.nanoTime() - start) / 1e9 + iterationTimes(iter) = elapsedSeconds + iter += 1 } + state.graphCheckpointer.deleteAllCheckpoints() + new DistributedLDAModel(state, iterationTimes) + } + + def runOnlineLDA(documents: RDD[(Long, Vector)]): LDAModel = { + val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k) + (0 until onlineLDA.batchNumber).map(_ => onlineLDA.next()) + new LocalLDAModel(Matrices.fromBreeze(onlineLDA.lambda).transpose) } /** Java-friendly version of [[run()]] */ - def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { + def run(documents: JavaPairRDD[java.lang.Long, Vector]): DistributedLDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } } @@ -418,42 +406,48 @@ private[clustering] object LDA { } - // todo: add reference to paper and Hoffman + /** + * Optimizer for Online LDA algorithm which breaks corpus into mini-batches and scans only once. + * Hoffman, Blei and Bach, “Online Learning for Latent Dirichlet Allocation.” NIPS, 2010. + */ private[clustering] class OnlineLDAOptimizer( - val documents: RDD[(Long, Vector)], - val k: Int, - val vocabSize: Int) extends Serializable{ + private val documents: RDD[(Long, Vector)], + private val k: Int) extends Serializable{ - private val kappa = 0.5 // (0.5, 1] how quickly old information is forgotten - private val tau0 = 1024 // down weights early iterations - private val D = documents.count() + private val vocabSize = documents.first._2.size + private val D = documents.count().toInt private val batchSize = if (D / 1000 > 4096) 4096 else if (D / 1000 < 4) 4 else D / 1000 - val batchNumber = (D/batchSize + 1).toInt - private val batches = documents.sliding(batchNumber).collect() + val batchNumber = D/batchSize // Initialize the variational distribution q(beta|lambda) - var _lambda = getGammaMatrix(k, vocabSize) // K * V - private var _Elogbeta = dirichlet_expectation(_lambda) // K * V - private var _expElogbeta = exp(_Elogbeta) // K * V + var lambda = getGammaMatrix(k, vocabSize) // K * V + private var Elogbeta = dirichlet_expectation(lambda) // K * V + private var expElogbeta = exp(Elogbeta) // K * V - private var batchCount = 0 + private var batchId = 0 def next(): Unit = { - // weight of the mini-batch. - val rhot = math.pow(tau0 + batchCount, -kappa) + require(batchId < batchNumber) + // weight of the mini-batch. 1024 down weights early iterations + val weight = math.pow(1024 + batchId, -0.5) + val batch = documents.filter(doc => doc._1 % batchNumber == batchId) + // Given a mini-batch of documents, estimates the parameters gamma controlling the + // variational distribution over the topic weights for each document in the mini-batch. var stat = BDM.zeros[Double](k, vocabSize) - stat = batches(batchCount).aggregate(stat)(seqOp, _ += _) - - stat = stat :* _expElogbeta - _lambda = _lambda * (1 - rhot) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * rhot - _Elogbeta = dirichlet_expectation(_lambda) - _expElogbeta = exp(_Elogbeta) - batchCount += 1 + stat = batch.aggregate(stat)(seqOp, _ += _) + stat = stat :* expElogbeta + + // Update lambda based on documents. + lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight + Elogbeta = dirichlet_expectation(lambda) + expElogbeta = exp(Elogbeta) + batchId += 1 } - private def seqOp(other: BDM[Double], doc: (Long, Vector)): BDM[Double] = { + // for each document d update that document's gamma and phi + private def seqOp(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { val termCounts = doc._2 val (ids, cts) = termCounts match { case v: DenseVector => (((0 until v.size).toList), v.values) @@ -461,15 +455,17 @@ private[clustering] object LDA { case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } + // Initialize the variational distribution q(theta|gamma) for the mini-batch var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K var expElogthetad = exp(Elogthetad.t).t // 1 * K - val expElogbetad = _expElogbeta(::, ids).toDenseMatrix // K * ids + val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids var meanchange = 1D - val ctsVector = new BDV[Double](cts).t // 1 * ids + val ctsVector = new BDV[Double](cts).t // 1 * ids + // Iterate between gamma and phi until convergence while (meanchange > 1e-6) { val lastgamma = gammad // 1*K 1 * ids ids * k @@ -480,22 +476,22 @@ private[clustering] object LDA { meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble } - val v1 = expElogthetad.t.toDenseMatrix.t - val v2 = (ctsVector / phinorm).t.toDenseMatrix - val outerResult = kron(v1, v2) // K * ids + val m1 = expElogthetad.t.toDenseMatrix.t + val m2 = (ctsVector / phinorm).t.toDenseMatrix + val outerResult = kron(m1, m2) // K * ids for (i <- 0 until ids.size) { - other(::, ids(i)) := (other(::, ids(i)) + outerResult(::, i)) + stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) } - other + stat } - def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) val temp = gammaRandomGenerator.sample(row * col).toArray (new BDM[Double](col, row, temp)).t } - def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { val rowSum = sum(alpha(breeze.linalg.*, ::)) val digAlpha = digamma(alpha) val digRowSum = digamma(rowSum) @@ -503,7 +499,7 @@ private[clustering] object LDA { result } - def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ + private def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ digamma(v) - digamma(sum(v)) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index fbe171b4b1ab1..dc10aa67c7c1f 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -88,7 +88,7 @@ public void distributedLDAModel() { .setMaxIterations(5) .setSeed(12345); - DistributedLDAModel model = (DistributedLDAModel)lda.run(corpus); + DistributedLDAModel model = lda.run(corpus); // Check: basic parameters LocalLDAModel localModel = model.toLocal(); diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index d36fb9b479c67..302d751eb8a94 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -68,7 +68,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { .setSeed(12345) val corpus = sc.parallelize(tinyCorpus, 2) - val model: DistributedLDAModel = lda.run(corpus).asInstanceOf[DistributedLDAModel] + val model: DistributedLDAModel = lda.run(corpus) // Check: basic parameters val localModel = model.toLocal From e271eb1a0f6c329b05d3611abb3def1aeffc900e Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 2 Mar 2015 23:16:33 +0800 Subject: [PATCH 07/21] remove non ascii --- .../src/main/scala/org/apache/spark/mllib/clustering/LDA.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 76ecdf92f26ed..4a34aedc33b71 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -408,7 +408,7 @@ private[clustering] object LDA { /** * Optimizer for Online LDA algorithm which breaks corpus into mini-batches and scans only once. - * Hoffman, Blei and Bach, “Online Learning for Latent Dirichlet Allocation.” NIPS, 2010. + * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ private[clustering] class OnlineLDAOptimizer( private val documents: RDD[(Long, Vector)], From a570c9a5cbdbf0ac7b7a4eae1e3b571e0060e5f0 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Wed, 11 Mar 2015 19:23:13 +0800 Subject: [PATCH 08/21] use sample to pick up batch --- .../apache/spark/mllib/clustering/LDA.scala | 49 ++++++++++++++----- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 76ecdf92f26ed..a3681e34a147d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -247,9 +247,34 @@ class LDA private ( new DistributedLDAModel(state, iterationTimes) } - def runOnlineLDA(documents: RDD[(Long, Vector)]): LDAModel = { - val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k) - (0 until onlineLDA.batchNumber).map(_ => onlineLDA.next()) + + /** + * Learn an LDA model using the given dataset, using online variational Bayes (VB) algorithm. + * Hoffman, Blei and Bach, “Online Learning for Latent Dirichlet Allocation.” NIPS, 2010. + * + * @param documents RDD of documents, which are term (word) count vectors paired with IDs. + * The term count vectors are "bags of words" with a fixed-size vocabulary + * (where the vocabulary size is the length of the vector). + * Document IDs must be unique and >= 0. + * @param batchNumber Number of batches. For each batch, recommendation size is [4, 16384]. + * -1 for automatic batchNumber. + * @return Inferred LDA model + */ + def runOnlineLDA(documents: RDD[(Long, Vector)], batchNumber: Int = -1): LDAModel = { + val D = documents.count().toInt + val batchSize = + if (batchNumber == -1) { // auto mode + if (D / 100 > 16384) 16384 + else if (D / 100 < 4) 4 + else D / 100 + } + else { + require(batchNumber > 0, "batchNumber should be positive or -1") + D / batchNumber + } + + val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, batchSize) + (0 until onlineLDA.actualBatchNumber).map(_ => onlineLDA.next()) new LocalLDAModel(Matrices.fromBreeze(onlineLDA.lambda).transpose) } @@ -411,28 +436,26 @@ private[clustering] object LDA { * Hoffman, Blei and Bach, “Online Learning for Latent Dirichlet Allocation.” NIPS, 2010. */ private[clustering] class OnlineLDAOptimizer( - private val documents: RDD[(Long, Vector)], - private val k: Int) extends Serializable{ + private val documents: RDD[(Long, Vector)], + private val k: Int, + private val batchSize: Int) extends Serializable{ private val vocabSize = documents.first._2.size private val D = documents.count().toInt - private val batchSize = if (D / 1000 > 4096) 4096 - else if (D / 1000 < 4) 4 - else D / 1000 - val batchNumber = D/batchSize + val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt - // Initialize the variational distribution q(beta|lambda) + //Initialize the variational distribution q(beta|lambda) var lambda = getGammaMatrix(k, vocabSize) // K * V private var Elogbeta = dirichlet_expectation(lambda) // K * V private var expElogbeta = exp(Elogbeta) // K * V private var batchId = 0 def next(): Unit = { - require(batchId < batchNumber) + require(batchId < actualBatchNumber) // weight of the mini-batch. 1024 down weights early iterations val weight = math.pow(1024 + batchId, -0.5) - val batch = documents.filter(doc => doc._1 % batchNumber == batchId) - + val batch = documents.sample(true, batchSize.toDouble / D) + batch.cache() // Given a mini-batch of documents, estimates the parameters gamma controlling the // variational distribution over the topic weights for each document in the mini-batch. var stat = BDM.zeros[Double](k, vocabSize) From 02d037387f32adcddd98858176813f3a66991a38 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 12 Mar 2015 10:43:03 +0800 Subject: [PATCH 09/21] fix style in comment --- .../src/main/scala/org/apache/spark/mllib/clustering/LDA.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 7e902333ebc70..dc4e8f9701837 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -250,7 +250,6 @@ class LDA private ( /** * Learn an LDA model using the given dataset, using online variational Bayes (VB) algorithm. - * Hoffman, Blei and Bach, “Online Learning for Latent Dirichlet Allocation.” NIPS, 2010. * * @param documents RDD of documents, which are term (word) count vectors paired with IDs. * The term count vectors are "bags of words" with a fixed-size vocabulary @@ -444,7 +443,7 @@ private[clustering] object LDA { private val D = documents.count().toInt val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt - //Initialize the variational distribution q(beta|lambda) + // Initialize the variational distribution q(beta|lambda) var lambda = getGammaMatrix(k, vocabSize) // K * V private var Elogbeta = dirichlet_expectation(lambda) // K * V private var expElogbeta = exp(Elogbeta) // K * V From f367cc90bcb19773b618ed3ca47b3529d8ee370c Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 23 Mar 2015 19:35:41 +0800 Subject: [PATCH 10/21] change to optimization --- .../apache/spark/mllib/clustering/LDA.scala | 75 ++++++++++--------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index dc4e8f9701837..0c69aa6d7d937 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -249,32 +249,24 @@ class LDA private ( /** + * TODO: add API to take documents paths once tokenizer is ready. * Learn an LDA model using the given dataset, using online variational Bayes (VB) algorithm. * * @param documents RDD of documents, which are term (word) count vectors paired with IDs. * The term count vectors are "bags of words" with a fixed-size vocabulary * (where the vocabulary size is the length of the vector). * Document IDs must be unique and >= 0. - * @param batchNumber Number of batches. For each batch, recommendation size is [4, 16384]. - * -1 for automatic batchNumber. + * @param batchNumber Number of batches to split input corpus. For each batch, recommendation + * size is [4, 16384]. -1 for automatic batchNumber. * @return Inferred LDA model */ def runOnlineLDA(documents: RDD[(Long, Vector)], batchNumber: Int = -1): LDAModel = { - val D = documents.count().toInt - val batchSize = - if (batchNumber == -1) { // auto mode - if (D / 100 > 16384) 16384 - else if (D / 100 < 4) 4 - else D / 100 - } - else { - require(batchNumber > 0, "batchNumber should be positive or -1") - D / batchNumber - } + require(batchNumber > 0 || batchNumber == -1, + s"batchNumber must be greater or -1, but was set to $batchNumber") - val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, batchSize) - (0 until onlineLDA.actualBatchNumber).map(_ => onlineLDA.next()) - new LocalLDAModel(Matrices.fromBreeze(onlineLDA.lambda).transpose) + val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, batchNumber) + val model = onlineLDA.optimize() + new LocalLDAModel(Matrices.fromBreeze(model).transpose) } /** Java-friendly version of [[run()]] */ @@ -437,39 +429,54 @@ private[clustering] object LDA { private[clustering] class OnlineLDAOptimizer( private val documents: RDD[(Long, Vector)], private val k: Int, - private val batchSize: Int) extends Serializable{ + private val batchNumber: Int) extends Serializable{ private val vocabSize = documents.first._2.size private val D = documents.count().toInt - val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt + private val batchSize = + if (batchNumber == -1) { // auto mode + if (D / 100 > 16384) 16384 + else if (D / 100 < 4) 4 + else D / 100 + } + else { + D / batchNumber + } // Initialize the variational distribution q(beta|lambda) - var lambda = getGammaMatrix(k, vocabSize) // K * V + private var lambda = getGammaMatrix(k, vocabSize) // K * V private var Elogbeta = dirichlet_expectation(lambda) // K * V private var expElogbeta = exp(Elogbeta) // K * V - private var batchId = 0 - def next(): Unit = { - require(batchId < actualBatchNumber) - // weight of the mini-batch. 1024 down weights early iterations - val weight = math.pow(1024 + batchId, -0.5) - val batch = documents.sample(true, batchSize.toDouble / D) - batch.cache() - // Given a mini-batch of documents, estimates the parameters gamma controlling the - // variational distribution over the topic weights for each document in the mini-batch. - var stat = BDM.zeros[Double](k, vocabSize) - stat = batch.aggregate(stat)(seqOp, _ += _) - stat = stat :* expElogbeta + def optimize(): BDM[Double] = { + val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt + for(i <- 1 to actualBatchNumber){ + val batch = documents.sample(true, batchSize.toDouble / D) + + // Given a mini-batch of documents, estimates the parameters gamma controlling the + // variational distribution over the topic weights for each document in the mini-batch. + var stat = BDM.zeros[Double](k, vocabSize) + stat = batch.treeAggregate(stat)(gradient, _ += _) + update(stat, i) + } + lambda + } + + private def update(raw: BDM[Double], iter:Int): Unit ={ + // weight of the mini-batch. 1024 helps down weights early iterations + val weight = math.pow(1024 + iter, -0.5) + + // This step finishes computing the sufficient statistics for the M step + val stat = raw :* expElogbeta // Update lambda based on documents. lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight Elogbeta = dirichlet_expectation(lambda) expElogbeta = exp(Elogbeta) - batchId += 1 } // for each document d update that document's gamma and phi - private def seqOp(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { + private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { val termCounts = doc._2 val (ids, cts) = termCounts match { case v: DenseVector => (((0 until v.size).toList), v.values) @@ -488,7 +495,7 @@ private[clustering] object LDA { val ctsVector = new BDV[Double](cts).t // 1 * ids // Iterate between gamma and phi until convergence - while (meanchange > 1e-6) { + while (meanchange > 1e-5) { val lastgamma = gammad // 1*K 1 * ids ids * k gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k From e7bf3b017596e2e613ac102ffa5dc58bde4d1c66 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 27 Mar 2015 17:43:43 +0800 Subject: [PATCH 11/21] move to seperate file --- .../apache/spark/mllib/clustering/LDA.scala | 139 +----------- .../spark/mllib/clustering/OnlineLDA.scala | 205 ++++++++++++++++++ 2 files changed, 207 insertions(+), 137 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 0c69aa6d7d937..5e17c8da61134 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -19,9 +19,7 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector => BDV, normalize, kron, sum, axpy => brzAxpy, DenseMatrix => BDM} -import breeze.numerics.{exp, abs, digamma} -import breeze.stats.distributions.Gamma +import breeze.linalg.{DenseVector => BDV, normalize, axpy => brzAxpy} import org.apache.spark.Logging import org.apache.spark.annotation.Experimental @@ -29,7 +27,7 @@ import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.{Vector, DenseVector, SparseVector, Matrices} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -247,28 +245,6 @@ class LDA private ( new DistributedLDAModel(state, iterationTimes) } - - /** - * TODO: add API to take documents paths once tokenizer is ready. - * Learn an LDA model using the given dataset, using online variational Bayes (VB) algorithm. - * - * @param documents RDD of documents, which are term (word) count vectors paired with IDs. - * The term count vectors are "bags of words" with a fixed-size vocabulary - * (where the vocabulary size is the length of the vector). - * Document IDs must be unique and >= 0. - * @param batchNumber Number of batches to split input corpus. For each batch, recommendation - * size is [4, 16384]. -1 for automatic batchNumber. - * @return Inferred LDA model - */ - def runOnlineLDA(documents: RDD[(Long, Vector)], batchNumber: Int = -1): LDAModel = { - require(batchNumber > 0 || batchNumber == -1, - s"batchNumber must be greater or -1, but was set to $batchNumber") - - val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, batchNumber) - val model = onlineLDA.optimize() - new LocalLDAModel(Matrices.fromBreeze(model).transpose) - } - /** Java-friendly version of [[run()]] */ def run(documents: JavaPairRDD[java.lang.Long, Vector]): DistributedLDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) @@ -422,117 +398,6 @@ private[clustering] object LDA { } - /** - * Optimizer for Online LDA algorithm which breaks corpus into mini-batches and scans only once. - * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. - */ - private[clustering] class OnlineLDAOptimizer( - private val documents: RDD[(Long, Vector)], - private val k: Int, - private val batchNumber: Int) extends Serializable{ - - private val vocabSize = documents.first._2.size - private val D = documents.count().toInt - private val batchSize = - if (batchNumber == -1) { // auto mode - if (D / 100 > 16384) 16384 - else if (D / 100 < 4) 4 - else D / 100 - } - else { - D / batchNumber - } - - // Initialize the variational distribution q(beta|lambda) - private var lambda = getGammaMatrix(k, vocabSize) // K * V - private var Elogbeta = dirichlet_expectation(lambda) // K * V - private var expElogbeta = exp(Elogbeta) // K * V - - def optimize(): BDM[Double] = { - val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt - for(i <- 1 to actualBatchNumber){ - val batch = documents.sample(true, batchSize.toDouble / D) - - // Given a mini-batch of documents, estimates the parameters gamma controlling the - // variational distribution over the topic weights for each document in the mini-batch. - var stat = BDM.zeros[Double](k, vocabSize) - stat = batch.treeAggregate(stat)(gradient, _ += _) - update(stat, i) - } - lambda - } - - private def update(raw: BDM[Double], iter:Int): Unit ={ - // weight of the mini-batch. 1024 helps down weights early iterations - val weight = math.pow(1024 + iter, -0.5) - - // This step finishes computing the sufficient statistics for the M step - val stat = raw :* expElogbeta - - // Update lambda based on documents. - lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight - Elogbeta = dirichlet_expectation(lambda) - expElogbeta = exp(Elogbeta) - } - - // for each document d update that document's gamma and phi - private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { - val termCounts = doc._2 - val (ids, cts) = termCounts match { - case v: DenseVector => (((0 until v.size).toList), v.values) - case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) - } - - // Initialize the variational distribution q(theta|gamma) for the mini-batch - var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K - var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K - var expElogthetad = exp(Elogthetad.t).t // 1 * K - val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids - - var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids - var meanchange = 1D - val ctsVector = new BDV[Double](cts).t // 1 * ids - - // Iterate between gamma and phi until convergence - while (meanchange > 1e-5) { - val lastgamma = gammad - // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k - Elogthetad = vector_dirichlet_expectation(gammad.t).t - expElogthetad = exp(Elogthetad.t).t - phinorm = expElogthetad * expElogbetad + 1e-100 - meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble - } - - val m1 = expElogthetad.t.toDenseMatrix.t - val m2 = (ctsVector / phinorm).t.toDenseMatrix - val outerResult = kron(m1, m2) // K * ids - for (i <- 0 until ids.size) { - stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) - } - stat - } - - private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ - val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) - val temp = gammaRandomGenerator.sample(row * col).toArray - (new BDM[Double](col, row, temp)).t - } - - private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { - val rowSum = sum(alpha(breeze.linalg.*, ::)) - val digAlpha = digamma(alpha) - val digRowSum = digamma(rowSum) - val result = digAlpha(::, breeze.linalg.*) - digRowSum - result - } - - private def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ - digamma(v) - digamma(sum(v)) - } - } - /** * Compute gamma_{wjk}, a distribution over topics k. */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala new file mode 100644 index 0000000000000..13891bba6573e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, kron, sum} +import breeze.numerics._ +import breeze.stats.distributions.Gamma +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg._ +import org.apache.spark.rdd.RDD + + +/** + * :: Experimental :: + * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + * + * Online LDA breaks the massive corps into mini batches and scans the corpus (doc sets) only + * once. Thus it needs not locally store or collect the documents and can be handily applied to + * streaming document collections. + * + * References: + * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + */ +@Experimental +object OnlineLDA{ + + /** + * Learns an LDA model from the given data set, using online variational Bayes (VB) algorithm. + * This is just designed as a handy API. For massive corpus, it's recommended to use + * OnlineLDAOptimizer directly and call submitMiniBatch in your application, which can help + * downgrade time and space complexity by not loading the entire corpus. + * + * @param documents RDD of documents, which are term (word) count vectors paired with IDs. + * The term count vectors are "bags of words" with a fixed-size vocabulary + * (where the vocabulary size is the length of the vector). + * Document IDs must be unique and >= 0. + * @param k Number of topics to infer. + * @param batchNumber Number of batches to split input corpus. For each batch, recommendation + * size is [4, 16384]. -1 for automatic batchNumber. + * @return Inferred LDA model + */ + def run(documents: RDD[(Long, Vector)], k: Int, batchNumber: Int = -1): LDAModel = { + require(batchNumber > 0 || batchNumber == -1, + s"batchNumber must be greater or -1, but was set to $batchNumber") + require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k") + + val vocabSize = documents.first._2.size + val D = documents.count().toInt // total documents count + val batchSize = + if (batchNumber == -1) { // auto mode + if (D / 100 > 16384) 16384 + else if (D / 100 < 4) 4 + else D / 100 + } + else { + D / batchNumber + } + + val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize) + val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt + for(i <- 1 to actualBatchNumber){ + val batch = documents.sample(true, batchSize.toDouble / D) + onlineLDA.submitMiniBatch(batch) + } + onlineLDA.getTopicDistribution() + } +} + +/** + * :: Experimental :: + * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + * + * An online training optimizer for LDA. The Optimizer processes a subset (like 1%) of the corpus + * by each call to submitMiniBatch, and update the term-topic distribution adaptively. User can + * get the result from getTopicDistribution. + * + * References: + * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + */ +@Experimental +class OnlineLDAOptimizer ( + private var k: Int, + private var D: Int, + private val vocabSize:Int) extends Serializable { + + // Initialize the variational distribution q(beta|lambda) + private var lambda = getGammaMatrix(k, vocabSize) // K * V + private var Elogbeta = dirichlet_expectation(lambda) // K * V + private var expElogbeta = exp(Elogbeta) // K * V + private var i = 0 + + /** + * Submit a a subset (like 1%) of the corpus to the Online LDA model, and it will update + * the topic distribution adaptively for the terms appearing in the subset (minibatch). + * The documents RDD can be discarded after submitMiniBatch finished. + * + * @param documents RDD of documents, which are term (word) count vectors paired with IDs. + * The term count vectors are "bags of words" with a fixed-size vocabulary + * (where the vocabulary size is the length of the vector). + * Document IDs must be unique and >= 0. + * @return Inferred LDA model + */ + def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { + var stat = BDM.zeros[Double](k, vocabSize) + stat = documents.treeAggregate(stat)(gradient, _ += _) + update(stat, i, documents.count().toInt) + i += 1 + } + + /** + * get the topic-term distribution + */ + def getTopicDistribution(): LDAModel ={ + new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) + } + + private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ + // weight of the mini-batch. 1024 helps down weights early iterations + val weight = math.pow(1024 + iter, -0.5) + + // This step finishes computing the sufficient statistics for the M step + val stat = raw :* expElogbeta + + // Update lambda based on documents. + lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight + Elogbeta = dirichlet_expectation(lambda) + expElogbeta = exp(Elogbeta) + } + + // for each document d update that document's gamma and phi + private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { + val termCounts = doc._2 + val (ids, cts) = termCounts match { + case v: DenseVector => (((0 until v.size).toList), v.values) + case v: SparseVector => (v.indices.toList, v.values) + case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + + // Initialize the variational distribution q(theta|gamma) for the mini-batch + var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K + var expElogthetad = exp(Elogthetad.t).t // 1 * K + val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids + + var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids + var meanchange = 1D + val ctsVector = new BDV[Double](cts).t // 1 * ids + + // Iterate between gamma and phi until convergence + while (meanchange > 1e-5) { + val lastgamma = gammad + // 1*K 1 * ids ids * k + gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k + Elogthetad = vector_dirichlet_expectation(gammad.t).t + expElogthetad = exp(Elogthetad.t).t + phinorm = expElogthetad * expElogbetad + 1e-100 + meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble + } + + val m1 = expElogthetad.t.toDenseMatrix.t + val m2 = (ctsVector / phinorm).t.toDenseMatrix + val outerResult = kron(m1, m2) // K * ids + for (i <- 0 until ids.size) { + stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + } + stat + } + + private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) + val temp = gammaRandomGenerator.sample(row * col).toArray + (new BDM[Double](col, row, temp)).t + } + + private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + val rowSum = sum(alpha(breeze.linalg.*, ::)) + val digAlpha = digamma(alpha) + val digRowSum = digamma(rowSum) + val result = digAlpha(::, breeze.linalg.*) - digRowSum + result + } + + private def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ + digamma(v) - digamma(sum(v)) + } +} + + + + From d19ef558433b7b062b40332ea38ac869fb7eb0d5 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 2 Apr 2015 19:28:15 +0800 Subject: [PATCH 12/21] change OnlineLDA to class --- .../spark/mllib/clustering/OnlineLDA.scala | 103 ++++++++++++------ 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala index 13891bba6573e..72c550144db0a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala @@ -24,7 +24,6 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD - /** * :: Experimental :: * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. @@ -37,7 +36,58 @@ import org.apache.spark.rdd.RDD * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ @Experimental -object OnlineLDA{ +class OnlineLDA( + private var k: Int, + private var numIterations: Int, + private var miniBatchFraction: Double, + private var tau_0: Double, + private var kappa: Double) { + + def this() = this(k = 10, numIterations = 100, miniBatchFraction = 0.01, + tau_0 = 1024, kappa = 0.5) + + /** + * Number of topics to infer. I.e., the number of soft cluster centers. + * (default = 10) + */ + def setK(k: Int): this.type = { + require(k > 0, s"OnlineLDA k (number of clusters) must be > 0, but was set to $k") + this.k = k + this + } + + /** + * Set the number of iterations for OnlineLDA. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set fraction of data to be used for each iteration. Default 0.01. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * A (positive) learning parameter that downweights early iterations. Default 1024. + */ + def setTau_0(t: Double): this.type = { + this.tau_0 = t + this + } + + /** + * Learning rate: exponential decay rate. Default 0.5. + */ + def setKappa(kappa: Double): this.type = { + this.kappa = kappa + this + } + /** * Learns an LDA model from the given data set, using online variational Bayes (VB) algorithm. @@ -49,33 +99,18 @@ object OnlineLDA{ * The term count vectors are "bags of words" with a fixed-size vocabulary * (where the vocabulary size is the length of the vector). * Document IDs must be unique and >= 0. - * @param k Number of topics to infer. - * @param batchNumber Number of batches to split input corpus. For each batch, recommendation - * size is [4, 16384]. -1 for automatic batchNumber. * @return Inferred LDA model */ - def run(documents: RDD[(Long, Vector)], k: Int, batchNumber: Int = -1): LDAModel = { - require(batchNumber > 0 || batchNumber == -1, - s"batchNumber must be greater or -1, but was set to $batchNumber") - require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k") - + def run(documents: RDD[(Long, Vector)]): LDAModel = { val vocabSize = documents.first._2.size val D = documents.count().toInt // total documents count - val batchSize = - if (batchNumber == -1) { // auto mode - if (D / 100 > 16384) 16384 - else if (D / 100 < 4) 4 - else D / 100 - } - else { - D / batchNumber - } - - val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize) - val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt - for(i <- 1 to actualBatchNumber){ - val batch = documents.sample(true, batchSize.toDouble / D) - onlineLDA.submitMiniBatch(batch) + val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize, tau_0, kappa) + + val arr = Array.fill(math.ceil(1.0 / miniBatchFraction).toInt)(miniBatchFraction) + for(i <- 0 until numIterations){ + val splits = documents.randomSplit(arr) + val index = i % splits.size + onlineLDA.submitMiniBatch(splits(index)) } onlineLDA.getTopicDistribution() } @@ -93,10 +128,12 @@ object OnlineLDA{ * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ @Experimental -class OnlineLDAOptimizer ( +private[clustering] class OnlineLDAOptimizer ( private var k: Int, private var D: Int, - private val vocabSize:Int) extends Serializable { + private val vocabSize: Int, + private val tau_0: Double, + private val kappa: Double) extends Serializable { // Initialize the variational distribution q(beta|lambda) private var lambda = getGammaMatrix(k, vocabSize) // K * V @@ -115,7 +152,11 @@ class OnlineLDAOptimizer ( * Document IDs must be unique and >= 0. * @return Inferred LDA model */ - def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { + private[clustering] def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { + if(documents.isEmpty()){ + return + } + var stat = BDM.zeros[Double](k, vocabSize) stat = documents.treeAggregate(stat)(gradient, _ += _) update(stat, i, documents.count().toInt) @@ -125,13 +166,13 @@ class OnlineLDAOptimizer ( /** * get the topic-term distribution */ - def getTopicDistribution(): LDAModel ={ + private[clustering] def getTopicDistribution(): LDAModel ={ new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) } private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - // weight of the mini-batch. 1024 helps down weights early iterations - val weight = math.pow(1024 + iter, -0.5) + // weight of the mini-batch. + val weight = math.pow(tau_0 + iter, -kappa) // This step finishes computing the sufficient statistics for the M step val stat = raw :* expElogbeta From b1178cfaad979f26ce02648795af65ecf08685c9 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 28 Apr 2015 22:01:41 +0800 Subject: [PATCH 13/21] fit into the optimizer framework --- .../apache/spark/mllib/clustering/LDA.scala | 38 +-- .../spark/mllib/clustering/LDAOptimizer.scala | 288 ++++++++++++++++-- .../spark/mllib/clustering/OnlineLDA.scala | 246 --------------- 3 files changed, 274 insertions(+), 298 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 37bf88b73b911..92e343fbcba3e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -78,21 +78,15 @@ class LDA private ( * * This is the parameter to a symmetric Dirichlet distribution. */ - def getDocConcentration: Double = { - if (this.docConcentration == -1) { - (50.0 / k) + 1.0 - } else { - this.docConcentration - } - } + def getDocConcentration: Double = this.docConcentration /** * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * - * This is the parameter to a symmetric Dirichlet distribution. + * This is the parameter to a symmetric Dirichlet distribution, where larger values + * mean more smoothing (more regularization). * - * This value should be > 1.0, where larger values mean more smoothing (more regularization). * If set to -1, then docConcentration is set automatically. * (default = -1 = automatic) * @@ -100,13 +94,12 @@ class LDA private ( * - For EM: default = (50 / k) + 1. * - The 50/k is common in LDA libraries. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - For Online: default = (1.0 / k). + * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. * - * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), - * but values in (0,1) are not yet supported. + * Note: For EM optimizer, This value should be > 1.0. */ def setDocConcentration(docConcentration: Double): this.type = { - require(docConcentration > 1.0 || docConcentration == -1.0, - s"LDA docConcentration must be > 1.0 (or -1 for auto), but was set to $docConcentration") this.docConcentration = docConcentration this } @@ -126,13 +119,7 @@ class LDA private ( * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. */ - def getTopicConcentration: Double = { - if (this.topicConcentration == -1) { - 1.1 - } else { - this.topicConcentration - } - } + def getTopicConcentration: Double = this.topicConcentration /** * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' @@ -143,7 +130,6 @@ class LDA private ( * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. * - * This value should be > 0.0. * If set to -1, then topicConcentration is set automatically. * (default = -1 = automatic) * @@ -151,13 +137,12 @@ class LDA private ( * - For EM: default = 0.1 + 1. * - The 0.1 gives a small amount of smoothing. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - For Online: default = (1.0 / k). + * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. * - * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), - * but values in (0,1) are not yet supported. + * Note: For EM optimizer, This value should be > 1.0. */ def setTopicConcentration(topicConcentration: Double): this.type = { - require(topicConcentration > 1.0 || topicConcentration == -1.0, - s"LDA topicConcentration must be > 1.0 (or -1 for auto), but was set to $topicConcentration") this.topicConcentration = topicConcentration this } @@ -245,8 +230,7 @@ class LDA private ( * @return Inferred LDA model */ def run(documents: RDD[(Long, Vector)]): LDAModel = { - val state = ldaOptimizer.initialState(documents, k, getDocConcentration, getTopicConcentration, - seed, checkpointInterval) + val state = ldaOptimizer.initialize(documents, this) var iter = 0 val iterationTimes = Array.fill[Double](maxIterations)(0) while (iter < maxIterations) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index ffd72a294c6c6..5b6d226cea2b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -19,13 +19,15 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector => BDV, normalize} +import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize, kron} +import breeze.numerics.{digamma, exp, abs} +import breeze.stats.distributions.Gamma import org.apache.spark.annotation.Experimental import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector} import org.apache.spark.rdd.RDD /** @@ -35,7 +37,7 @@ import org.apache.spark.rdd.RDD * hold optimizer-specific parameters for users to set. */ @Experimental -trait LDAOptimizer{ +trait LDAOptimizer { /* DEVELOPERS NOTE: @@ -49,13 +51,7 @@ trait LDAOptimizer{ * Initializer for the optimizer. LDA passes the common parameters to the optimizer and * the internal structure can be initialized properly. */ - private[clustering] def initialState( - docs: RDD[(Long, Vector)], - k: Int, - docConcentration: Double, - topicConcentration: Double, - randomSeed: Long, - checkpointInterval: Int): LDAOptimizer + private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer private[clustering] def next(): LDAOptimizer @@ -80,12 +76,12 @@ trait LDAOptimizer{ * */ @Experimental -class EMLDAOptimizer extends LDAOptimizer{ +class EMLDAOptimizer extends LDAOptimizer { import LDA._ /** - * Following fields will only be initialized through initialState method + * Following fields will only be initialized through initialize method */ private[clustering] var graph: Graph[TopicCounts, TokenCount] = null private[clustering] var k: Int = 0 @@ -98,13 +94,38 @@ class EMLDAOptimizer extends LDAOptimizer{ /** * Compute bipartite term/doc graph. */ - private[clustering] override def initialState( - docs: RDD[(Long, Vector)], - k: Int, - docConcentration: Double, - topicConcentration: Double, - randomSeed: Long, - checkpointInterval: Int): LDAOptimizer = { + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): + LDAOptimizer = { + + val docConcentration = lda.getDocConcentration + val topicConcentration = lda.getTopicConcentration + val k = lda.getK + + /** + * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), + * but values in (0,1) are not yet supported. + */ + require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" + + s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration") + require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " + + s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration") + + /** + * - For EM: default = (50 / k) + 1. + * - The 50/k is common in LDA libraries. + * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + */ + this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration + + /** + * - For EM: default = 0.1 + 1. + * - The 0.1 gives a small amount of smoothing. + * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + */ + this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration + + val randomSeed = lda.getSeed + // For each document, create an edge (Document -> Term) for each unique term in the document. val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) => // Add edges for terms with non-zero counts. @@ -113,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer{ } } - val vocabSize = docs.take(1).head._2.size - // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). def createVertices(): RDD[(VertexId, TopicCounts)] = { @@ -135,10 +154,8 @@ class EMLDAOptimizer extends LDAOptimizer{ // Partition such that edges are grouped by document this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k - this.vocabSize = vocabSize - this.docConcentration = docConcentration - this.topicConcentration = topicConcentration - this.checkpointInterval = checkpointInterval + this.vocabSize = docs.take(1).head._2.size + this.checkpointInterval = lda.getCheckpointInterval this.graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount](graph, checkpointInterval) this.globalTopicTotals = computeGlobalTopicTotals() @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{ new DistributedLDAModel(this, iterationTimes) } } + + +/** + * :: Experimental :: + * + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which + * processes a subset of the corpus by each call to next, and update the term-topic + * distribution adaptively. + * + * References: + * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + */ +@Experimental +class OnlineLDAOptimizer extends LDAOptimizer { + + // LDA common parameters + private var k: Int = 0 + private var D: Int = 0 + private var vocabSize: Int = 0 + private var alpha: Double = 0 + private var eta: Double = 0 + private var randomSeed: Long = 0 + + // Online LDA specific parameters + private var tau_0: Double = -1 + private var kappa: Double = -1 + private var batchSize: Int = -1 + + // internal data structure + private var docs: RDD[(Long, Vector)] = null + private var lambda: BDM[Double] = null + private var Elogbeta: BDM[Double]= null + private var expElogbeta: BDM[Double] = null + + // count of invocation to next, used to help deciding the weight for each iteration + private var iteration = 0 + + /** + * A (positive) learning parameter that downweights early iterations + */ + def getTau_0: Double = { + if (this.tau_0 == -1) { + 1024 + } else { + this.tau_0 + } + } + + /** + * A (positive) learning parameter that downweights early iterations + * Automatic setting of parameter: + * - default = 1024, which follows the recommendation from OnlineLDA paper. + */ + def setTau_0(tau_0: Double): this.type = { + require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0") + this.tau_0 = tau_0 + this + } + + /** + * Learning rate: exponential decay rate + */ + def getKappa: Double = { + if (this.kappa == -1) { + 0.5 + } else { + this.kappa + } + } + + /** + * Learning rate: exponential decay rate---should be between + * (0.5, 1.0] to guarantee asymptotic convergence. + * - default = 0.5, which follows the recommendation from OnlineLDA paper. + */ + def setKappa(kappa: Double): this.type = { + require(kappa >= 0 || kappa == -1.0, + s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa") + this.kappa = kappa + this + } + + /** + * Mini-batch size, which controls how many documents are used in each iteration + */ + def getBatchSize: Int = { + if (this.batchSize == -1) { + D / 100 + } else { + this.batchSize + } + } + + /** + * Mini-batch size, which controls how many documents are used in each iteration + * default = 1% from total documents. + */ + def setBatchSize(batchSize: Int): this.type = { + this.batchSize = batchSize + this + } + + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + + this.k = lda.getK + this.D = docs.count().toInt + this.vocabSize = docs.first()._2.size + this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration + this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration + this.randomSeed = randomSeed + + this.docs = docs + + // Initialize the variational distribution q(beta|lambda) + this.lambda = getGammaMatrix(k, vocabSize) + this.Elogbeta = dirichlet_expectation(lambda) + this.expElogbeta = exp(Elogbeta) + this.iteration = 0 + this + } + + /** + * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model, + * and it will update the topic distribution adaptively for the terms appearing in the subset. + * + * @return Inferred LDA model + */ + private[clustering] override def next(): OnlineLDAOptimizer = { + iteration += 1 + val batchSize = getBatchSize + val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache() + if(batch.isEmpty()) return this + + val k = this.k + val vocabSize = this.vocabSize + val expElogbeta = this.expElogbeta + val alpha = this.alpha + + val stats = batch.mapPartitions(docs =>{ + val stat = BDM.zeros[Double](k, vocabSize) + docs.foreach(doc =>{ + val termCounts = doc._2 + val (ids, cts) = termCounts match { + case v: DenseVector => (((0 until v.size).toList), v.values) + case v: SparseVector => (v.indices.toList, v.values) + case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + + // Initialize the variational distribution q(theta|gamma) for the mini-batch + var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K + var expElogthetad = exp(Elogthetad) // 1 * K + val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids + + var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids + var meanchange = 1D + val ctsVector = new BDV[Double](cts).t // 1 * ids + + // Iterate between gamma and phi until convergence + while (meanchange > 1e-5) { + val lastgamma = gammad + // 1*K 1 * ids ids * k + gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha + Elogthetad = digamma(gammad) - digamma(sum(gammad)) + expElogthetad = exp(Elogthetad) + phinorm = expElogthetad * expElogbetad + 1e-100 + meanchange = sum(abs(gammad - lastgamma)) / k + } + + val m1 = expElogthetad.t.toDenseMatrix.t + val m2 = (ctsVector / phinorm).t.toDenseMatrix + val outerResult = kron(m1, m2) // K * ids + for (i <- 0 until ids.size) { + stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + } + stat + }) + Iterator(stat) + }) + + val batchResult = stats.reduce(_ += _) + update(batchResult, iteration, batchSize) + batch.unpersist() + this + } + + private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) + } + + private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ + + val tau_0 = this.getTau_0 + val kappa = this.getKappa + + // weight of the mini-batch. + val weight = math.pow(tau_0 + iter, -kappa) + + // This step finishes computing the sufficient statistics for the M step + val stat = raw :* expElogbeta + + // Update lambda based on documents. + lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight + Elogbeta = dirichlet_expectation(lambda) + expElogbeta = exp(Elogbeta) + } + + private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) + val temp = gammaRandomGenerator.sample(row * col).toArray + (new BDM[Double](col, row, temp)).t + } + + private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + val rowSum = sum(alpha(breeze.linalg.*, ::)) + val digAlpha = digamma(alpha) + val digRowSum = digamma(rowSum) + val result = digAlpha(::, breeze.linalg.*) - digRowSum + result + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala deleted file mode 100644 index 72c550144db0a..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.clustering - -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, kron, sum} -import breeze.numerics._ -import breeze.stats.distributions.Gamma -import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg._ -import org.apache.spark.rdd.RDD - -/** - * :: Experimental :: - * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. - * - * Online LDA breaks the massive corps into mini batches and scans the corpus (doc sets) only - * once. Thus it needs not locally store or collect the documents and can be handily applied to - * streaming document collections. - * - * References: - * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. - */ -@Experimental -class OnlineLDA( - private var k: Int, - private var numIterations: Int, - private var miniBatchFraction: Double, - private var tau_0: Double, - private var kappa: Double) { - - def this() = this(k = 10, numIterations = 100, miniBatchFraction = 0.01, - tau_0 = 1024, kappa = 0.5) - - /** - * Number of topics to infer. I.e., the number of soft cluster centers. - * (default = 10) - */ - def setK(k: Int): this.type = { - require(k > 0, s"OnlineLDA k (number of clusters) must be > 0, but was set to $k") - this.k = k - this - } - - /** - * Set the number of iterations for OnlineLDA. Default 100. - */ - def setNumIterations(iters: Int): this.type = { - this.numIterations = iters - this - } - - /** - * Set fraction of data to be used for each iteration. Default 0.01. - */ - def setMiniBatchFraction(fraction: Double): this.type = { - this.miniBatchFraction = fraction - this - } - - /** - * A (positive) learning parameter that downweights early iterations. Default 1024. - */ - def setTau_0(t: Double): this.type = { - this.tau_0 = t - this - } - - /** - * Learning rate: exponential decay rate. Default 0.5. - */ - def setKappa(kappa: Double): this.type = { - this.kappa = kappa - this - } - - - /** - * Learns an LDA model from the given data set, using online variational Bayes (VB) algorithm. - * This is just designed as a handy API. For massive corpus, it's recommended to use - * OnlineLDAOptimizer directly and call submitMiniBatch in your application, which can help - * downgrade time and space complexity by not loading the entire corpus. - * - * @param documents RDD of documents, which are term (word) count vectors paired with IDs. - * The term count vectors are "bags of words" with a fixed-size vocabulary - * (where the vocabulary size is the length of the vector). - * Document IDs must be unique and >= 0. - * @return Inferred LDA model - */ - def run(documents: RDD[(Long, Vector)]): LDAModel = { - val vocabSize = documents.first._2.size - val D = documents.count().toInt // total documents count - val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize, tau_0, kappa) - - val arr = Array.fill(math.ceil(1.0 / miniBatchFraction).toInt)(miniBatchFraction) - for(i <- 0 until numIterations){ - val splits = documents.randomSplit(arr) - val index = i % splits.size - onlineLDA.submitMiniBatch(splits(index)) - } - onlineLDA.getTopicDistribution() - } -} - -/** - * :: Experimental :: - * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. - * - * An online training optimizer for LDA. The Optimizer processes a subset (like 1%) of the corpus - * by each call to submitMiniBatch, and update the term-topic distribution adaptively. User can - * get the result from getTopicDistribution. - * - * References: - * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. - */ -@Experimental -private[clustering] class OnlineLDAOptimizer ( - private var k: Int, - private var D: Int, - private val vocabSize: Int, - private val tau_0: Double, - private val kappa: Double) extends Serializable { - - // Initialize the variational distribution q(beta|lambda) - private var lambda = getGammaMatrix(k, vocabSize) // K * V - private var Elogbeta = dirichlet_expectation(lambda) // K * V - private var expElogbeta = exp(Elogbeta) // K * V - private var i = 0 - - /** - * Submit a a subset (like 1%) of the corpus to the Online LDA model, and it will update - * the topic distribution adaptively for the terms appearing in the subset (minibatch). - * The documents RDD can be discarded after submitMiniBatch finished. - * - * @param documents RDD of documents, which are term (word) count vectors paired with IDs. - * The term count vectors are "bags of words" with a fixed-size vocabulary - * (where the vocabulary size is the length of the vector). - * Document IDs must be unique and >= 0. - * @return Inferred LDA model - */ - private[clustering] def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { - if(documents.isEmpty()){ - return - } - - var stat = BDM.zeros[Double](k, vocabSize) - stat = documents.treeAggregate(stat)(gradient, _ += _) - update(stat, i, documents.count().toInt) - i += 1 - } - - /** - * get the topic-term distribution - */ - private[clustering] def getTopicDistribution(): LDAModel ={ - new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) - } - - private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - // weight of the mini-batch. - val weight = math.pow(tau_0 + iter, -kappa) - - // This step finishes computing the sufficient statistics for the M step - val stat = raw :* expElogbeta - - // Update lambda based on documents. - lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight - Elogbeta = dirichlet_expectation(lambda) - expElogbeta = exp(Elogbeta) - } - - // for each document d update that document's gamma and phi - private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { - val termCounts = doc._2 - val (ids, cts) = termCounts match { - case v: DenseVector => (((0 until v.size).toList), v.values) - case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) - } - - // Initialize the variational distribution q(theta|gamma) for the mini-batch - var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K - var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K - var expElogthetad = exp(Elogthetad.t).t // 1 * K - val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids - - var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids - var meanchange = 1D - val ctsVector = new BDV[Double](cts).t // 1 * ids - - // Iterate between gamma and phi until convergence - while (meanchange > 1e-5) { - val lastgamma = gammad - // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k - Elogthetad = vector_dirichlet_expectation(gammad.t).t - expElogthetad = exp(Elogthetad.t).t - phinorm = expElogthetad * expElogbetad + 1e-100 - meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble - } - - val m1 = expElogthetad.t.toDenseMatrix.t - val m2 = (ctsVector / phinorm).t.toDenseMatrix - val outerResult = kron(m1, m2) // K * ids - for (i <- 0 until ids.size) { - stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) - } - stat - } - - private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ - val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) - val temp = gammaRandomGenerator.sample(row * col).toArray - (new BDM[Double](col, row, temp)).t - } - - private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { - val rowSum = sum(alpha(breeze.linalg.*, ::)) - val digAlpha = digamma(alpha) - val digRowSum = digamma(rowSum) - val result = digAlpha(::, breeze.linalg.*) - digRowSum - result - } - - private def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ - digamma(v) - digamma(sum(v)) - } -} - - - - From a996a82e530a9e3d1465bb8872feb0ab70e579a3 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Wed, 29 Apr 2015 09:51:25 +0800 Subject: [PATCH 14/21] respond to comments --- .../apache/spark/mllib/clustering/LDA.scala | 7 +- .../spark/mllib/clustering/LDAOptimizer.scala | 111 +++++++----------- 2 files changed, 48 insertions(+), 70 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 92e343fbcba3e..70120b9d0192c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -95,7 +95,7 @@ class LDA private ( * - The 50/k is common in LDA libraries. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - For Online: default = (1.0 / k). - * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. + * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. * * Note: For EM optimizer, This value should be > 1.0. */ @@ -117,7 +117,8 @@ class LDA private ( * This is the parameter to a symmetric Dirichlet distribution. * * Note: The topics' distributions over terms are called "beta" in the original LDA paper - * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * by Blei et al., but are ca + * lled "phi" in many later papers such as Asuncion et al., 2009. */ def getTopicConcentration: Double = this.topicConcentration @@ -138,7 +139,7 @@ class LDA private ( * - The 0.1 gives a small amount of smoothing. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - For Online: default = (1.0 / k). - * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. + * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. * * Note: For EM optimizer, This value should be > 1.0. */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 5b6d226cea2b3..6d2d93a525a9c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -94,36 +94,21 @@ class EMLDAOptimizer extends LDAOptimizer { /** * Compute bipartite term/doc graph. */ - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): - LDAOptimizer = { + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ val docConcentration = lda.getDocConcentration val topicConcentration = lda.getTopicConcentration val k = lda.getK - /** - * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), - * but values in (0,1) are not yet supported. - */ + // Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), + // but values in (0,1) are not yet supported. require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" + s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration") require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " + s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration") - /** - * - For EM: default = (50 / k) + 1. - * - The 50/k is common in LDA libraries. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - */ this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration - - /** - * - For EM: default = 0.1 + 1. - * - The 0.1 gives a small amount of smoothing. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - */ this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration - val randomSeed = lda.getSeed // For each document, create an edge (Document -> Term) for each unique term in the document. @@ -230,8 +215,8 @@ class EMLDAOptimizer extends LDAOptimizer { /** * :: Experimental :: * - * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which - * processes a subset of the corpus by each call to next, and update the term-topic + * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA + * algorithm, which processes a subset of the corpus on each iteration, and update the term-topic * distribution adaptively. * * References: @@ -242,16 +227,16 @@ class OnlineLDAOptimizer extends LDAOptimizer { // LDA common parameters private var k: Int = 0 - private var D: Int = 0 + private var corpusSize: Long = 0 private var vocabSize: Int = 0 private var alpha: Double = 0 private var eta: Double = 0 - private var randomSeed: Long = 0 + private var randomGenerator: java.util.Random = null // Online LDA specific parameters - private var tau_0: Double = -1 - private var kappa: Double = -1 - private var batchSize: Int = -1 + private var tau_0: Double = 1024 + private var kappa: Double = 0.51 + private var minibatchFraction: Double = 0.01 // internal data structure private var docs: RDD[(Long, Vector)] = null @@ -259,22 +244,18 @@ class OnlineLDAOptimizer extends LDAOptimizer { private var Elogbeta: BDM[Double]= null private var expElogbeta: BDM[Double] = null - // count of invocation to next, used to help deciding the weight for each iteration + // count of invocation to next, which helps deciding the weight for each iteration private var iteration = 0 /** - * A (positive) learning parameter that downweights early iterations + * A (positive) learning parameter that downweights early iterations. Larger values make early + * iterations count less */ - def getTau_0: Double = { - if (this.tau_0 == -1) { - 1024 - } else { - this.tau_0 - } - } + def getTau_0: Double = this.tau_0 /** - * A (positive) learning parameter that downweights early iterations + * A (positive) learning parameter that downweights early iterations. Larger values make early + * iterations count less * Automatic setting of parameter: * - default = 1024, which follows the recommendation from OnlineLDA paper. */ @@ -287,18 +268,12 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate */ - def getKappa: Double = { - if (this.kappa == -1) { - 0.5 - } else { - this.kappa - } - } + def getKappa: Double = this.kappa /** * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. - * - default = 0.5, which follows the recommendation from OnlineLDA paper. + * - default = 0.51, which follows the recommendation from OnlineLDA paper. */ def setKappa(kappa: Double): this.type = { require(kappa >= 0 || kappa == -1.0, @@ -310,52 +285,44 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Mini-batch size, which controls how many documents are used in each iteration */ - def getBatchSize: Int = { - if (this.batchSize == -1) { - D / 100 - } else { - this.batchSize - } - } + def getMiniBatchFraction: Double = this.minibatchFraction /** * Mini-batch size, which controls how many documents are used in each iteration * default = 1% from total documents. */ - def setBatchSize(batchSize: Int): this.type = { - this.batchSize = batchSize + def setMiniBatchFraction(miniBatchFraction: Double): this.type = { + this.minibatchFraction = miniBatchFraction this } - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ this.k = lda.getK - this.D = docs.count().toInt + this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration - this.randomSeed = randomSeed + this.randomGenerator = new Random(lda.getSeed) this.docs = docs // Initialize the variational distribution q(beta|lambda) this.lambda = getGammaMatrix(k, vocabSize) - this.Elogbeta = dirichlet_expectation(lambda) + this.Elogbeta = dirichletExpectation(lambda) this.expElogbeta = exp(Elogbeta) this.iteration = 0 this } /** - * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model, - * and it will update the topic distribution adaptively for the terms appearing in the subset. - * - * @return Inferred LDA model + * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA + * model, and it will update the topic distribution adaptively for the terms appearing in the + * subset. */ private[clustering] override def next(): OnlineLDAOptimizer = { iteration += 1 - val batchSize = getBatchSize - val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache() + val batch = docs.sample(true, minibatchFraction, randomGenerator.nextLong()) if(batch.isEmpty()) return this val k = this.k @@ -406,8 +373,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { }) val batchResult = stats.reduce(_ += _) - update(batchResult, iteration, batchSize) - batch.unpersist() + update(batchResult, iteration, (minibatchFraction * corpusSize).toInt) this } @@ -415,6 +381,9 @@ class OnlineLDAOptimizer extends LDAOptimizer { new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) } + /** + * Update lambda based on the batch submitted. batchSize can be different for each iteration. + */ private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ val tau_0 = this.getTau_0 @@ -427,18 +396,26 @@ class OnlineLDAOptimizer extends LDAOptimizer { val stat = raw :* expElogbeta // Update lambda based on documents. - lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight - Elogbeta = dirichlet_expectation(lambda) + lambda = lambda * (1 - weight) + + (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight + Elogbeta = dirichletExpectation(lambda) expElogbeta = exp(Elogbeta) } + /** + * Get a random matrix to initialize lambda + */ private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) val temp = gammaRandomGenerator.sample(row * col).toArray (new BDM[Double](col, row, temp)).t } - private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + /** + * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation + * uses digamma which is accurate but expensive. + */ + private def dirichletExpectation(alpha : BDM[Double]): BDM[Double] = { val rowSum = sum(alpha(breeze.linalg.*, ::)) val digAlpha = digamma(alpha) val digRowSum = digamma(rowSum) From 61d60dfb2f6a614a97d327bed3183410a8a65700 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 28 Apr 2015 20:28:03 -0700 Subject: [PATCH 15/21] Minor cleanups: * Update *Concentration parameter documentation * EM Optimizer: createVertices() does not need to be a function * OnlineLDAOptimizer: typos in doc * Clean up the core code for online LDA (Scala style) --- .../apache/spark/mllib/clustering/LDA.scala | 37 ++++---- .../spark/mllib/clustering/LDAOptimizer.scala | 89 +++++++++---------- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 70120b9d0192c..597f17b0972a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -90,14 +90,15 @@ class LDA private ( * If set to -1, then docConcentration is set automatically. * (default = -1 = automatic) * - * Automatic setting of parameter: - * - For EM: default = (50 / k) + 1. - * - The 50/k is common in LDA libraries. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - For Online: default = (1.0 / k). - * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. - * - * Note: For EM optimizer, This value should be > 1.0. + * Optimizer-specific parameter settings: + * - EM + * - Value should be > 1.0 + * - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Value should be >= 0 + * - default = (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. */ def setDocConcentration(docConcentration: Double): this.type = { this.docConcentration = docConcentration @@ -117,8 +118,7 @@ class LDA private ( * This is the parameter to a symmetric Dirichlet distribution. * * Note: The topics' distributions over terms are called "beta" in the original LDA paper - * by Blei et al., but are ca - * lled "phi" in many later papers such as Asuncion et al., 2009. + * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. */ def getTopicConcentration: Double = this.topicConcentration @@ -134,14 +134,15 @@ class LDA private ( * If set to -1, then topicConcentration is set automatically. * (default = -1 = automatic) * - * Automatic setting of parameter: - * - For EM: default = 0.1 + 1. - * - The 0.1 gives a small amount of smoothing. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - For Online: default = (1.0 / k). - * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. - * - * Note: For EM optimizer, This value should be > 1.0. + * Optimizer-specific parameter settings: + * - EM + * - Value should be > 1.0 + * - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows + * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Value should be >= 0 + * - default = (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. */ def setTopicConcentration(topicConcentration: Double): this.type = { this.topicConcentration = topicConcentration diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 6d2d93a525a9c..f8b386ed5e2e3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -81,7 +81,7 @@ class EMLDAOptimizer extends LDAOptimizer { import LDA._ /** - * Following fields will only be initialized through initialize method + * The following fields will only be initialized through the initialize() method */ private[clustering] var graph: Graph[TopicCounts, TokenCount] = null private[clustering] var k: Int = 0 @@ -94,7 +94,7 @@ class EMLDAOptimizer extends LDAOptimizer { /** * Compute bipartite term/doc graph. */ - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ + override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { val docConcentration = lda.getDocConcentration val topicConcentration = lda.getTopicConcentration @@ -121,7 +121,7 @@ class EMLDAOptimizer extends LDAOptimizer { // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). - def createVertices(): RDD[(VertexId, TopicCounts)] = { + val docTermVertices: RDD[(VertexId, TopicCounts)] = { val verticesTMP: RDD[(VertexId, TopicCounts)] = edges.mapPartitionsWithIndex { case (partIndex, partEdges) => val random = new Random(partIndex + randomSeed) @@ -134,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer { verticesTMP.reduceByKey(_ + _) } - val docTermVertices = createVertices() - // Partition such that edges are grouped by document this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k @@ -216,10 +214,10 @@ class EMLDAOptimizer extends LDAOptimizer { * :: Experimental :: * * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA - * algorithm, which processes a subset of the corpus on each iteration, and update the term-topic + * algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic * distribution adaptively. * - * References: + * Original Online LDA paper: * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ @Experimental @@ -236,31 +234,30 @@ class OnlineLDAOptimizer extends LDAOptimizer { // Online LDA specific parameters private var tau_0: Double = 1024 private var kappa: Double = 0.51 - private var minibatchFraction: Double = 0.01 + private var miniBatchFraction: Double = 0.01 // internal data structure private var docs: RDD[(Long, Vector)] = null private var lambda: BDM[Double] = null - private var Elogbeta: BDM[Double]= null + private var Elogbeta: BDM[Double] = null private var expElogbeta: BDM[Double] = null // count of invocation to next, which helps deciding the weight for each iteration - private var iteration = 0 + private var iteration: Int = 0 /** * A (positive) learning parameter that downweights early iterations. Larger values make early - * iterations count less + * iterations count less. */ def getTau_0: Double = this.tau_0 /** * A (positive) learning parameter that downweights early iterations. Larger values make early - * iterations count less - * Automatic setting of parameter: - * - default = 1024, which follows the recommendation from OnlineLDA paper. + * iterations count less. + * Default: 1024, following the original Online LDA paper. */ def setTau_0(tau_0: Double): this.type = { - require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0") + require(tau_0 > 0, s"LDA tau_0 must be positive, but was set to $tau_0") this.tau_0 = tau_0 this } @@ -273,31 +270,32 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. - * - default = 0.51, which follows the recommendation from OnlineLDA paper. + * Default: 0.51, based on the original Online LDA paper. */ def setKappa(kappa: Double): this.type = { - require(kappa >= 0 || kappa == -1.0, - s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa") + require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa") this.kappa = kappa this } /** - * Mini-batch size, which controls how many documents are used in each iteration + * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration */ - def getMiniBatchFraction: Double = this.minibatchFraction + def getMiniBatchFraction: Double = this.miniBatchFraction /** - * Mini-batch size, which controls how many documents are used in each iteration - * default = 1% from total documents. + * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in + * each iteration. + * Default: 0.01, i.e., 1% of total documents */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { - this.minibatchFraction = miniBatchFraction + require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0, + s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction") + this.miniBatchFraction = miniBatchFraction this } - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ - + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size @@ -322,22 +320,23 @@ class OnlineLDAOptimizer extends LDAOptimizer { */ private[clustering] override def next(): OnlineLDAOptimizer = { iteration += 1 - val batch = docs.sample(true, minibatchFraction, randomGenerator.nextLong()) - if(batch.isEmpty()) return this + val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) + if (batch.isEmpty()) return this val k = this.k val vocabSize = this.vocabSize val expElogbeta = this.expElogbeta val alpha = this.alpha - val stats = batch.mapPartitions(docs =>{ + val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => val stat = BDM.zeros[Double](k, vocabSize) - docs.foreach(doc =>{ + docs.foreach { doc => val termCounts = doc._2 - val (ids, cts) = termCounts match { - case v: DenseVector => (((0 until v.size).toList), v.values) + val (ids: List[Int], cts: Array[Double]) = termCounts match { + case v: DenseVector => ((0 until v.size).toList, v.values) case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + case v => throw new IllegalArgumentException("Online LDA does not support vector type " + + v.getClass) } // Initialize the variational distribution q(theta|gamma) for the mini-batch @@ -354,7 +353,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { while (meanchange > 1e-5) { val lastgamma = gammad // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha + gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha Elogthetad = digamma(gammad) - digamma(sum(gammad)) expElogthetad = exp(Elogthetad) phinorm = expElogthetad * expElogbetad + 1e-100 @@ -364,28 +363,28 @@ class OnlineLDAOptimizer extends LDAOptimizer { val m1 = expElogthetad.t.toDenseMatrix.t val m2 = (ctsVector / phinorm).t.toDenseMatrix val outerResult = kron(m1, m2) // K * ids - for (i <- 0 until ids.size) { + var i = 0 + while (i < ids.size) { stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + i += 1 } - stat - }) + } Iterator(stat) - }) + } - val batchResult = stats.reduce(_ += _) - update(batchResult, iteration, (minibatchFraction * corpusSize).toInt) + val batchResult: BDM[Double] = stats.reduce(_ += _) + update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt) this } - private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) } /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - + private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = { val tau_0 = this.getTau_0 val kappa = this.getKappa @@ -405,17 +404,17 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Get a random matrix to initialize lambda */ - private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + private def getGammaMatrix(row: Int, col: Int): BDM[Double] = { val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) val temp = gammaRandomGenerator.sample(row * col).toArray - (new BDM[Double](col, row, temp)).t + new BDM[Double](col, row, temp).t } /** * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation * uses digamma which is accurate but expensive. */ - private def dirichletExpectation(alpha : BDM[Double]): BDM[Double] = { + private def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = { val rowSum = sum(alpha(breeze.linalg.*, ::)) val digAlpha = digamma(alpha) val digRowSum = digamma(rowSum) From 9e910d972daf1234e407ee0c39dc40a2690f6908 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 28 Apr 2015 20:35:17 -0700 Subject: [PATCH 16/21] small fix --- .../org/apache/spark/mllib/clustering/LDAOptimizer.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index f8b386ed5e2e3..b6182be02fb56 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -145,7 +145,7 @@ class EMLDAOptimizer extends LDAOptimizer { this } - private[clustering] override def next(): EMLDAOptimizer = { + override private[clustering] def next(): EMLDAOptimizer = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") val eta = topicConcentration @@ -202,7 +202,7 @@ class EMLDAOptimizer extends LDAOptimizer { graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _) } - private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() new DistributedLDAModel(this, iterationTimes) @@ -295,7 +295,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { this } - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size @@ -318,7 +318,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { * model, and it will update the topic distribution adaptively for the terms appearing in the * subset. */ - private[clustering] override def next(): OnlineLDAOptimizer = { + override private[clustering] def next(): OnlineLDAOptimizer = { iteration += 1 val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) if (batch.isEmpty()) return this From 4041723d4fe0dcfab845c9d2a2c72be2ed87895e Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 30 Apr 2015 00:58:41 +0800 Subject: [PATCH 17/21] add ut --- .../spark/mllib/clustering/LDAOptimizer.scala | 64 +++++++++----- .../spark/mllib/clustering/LDASuite.scala | 85 ++++++++++++++++++- 2 files changed, 125 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index b6182be02fb56..d0a299cb894b2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -227,8 +227,8 @@ class OnlineLDAOptimizer extends LDAOptimizer { private var k: Int = 0 private var corpusSize: Long = 0 private var vocabSize: Int = 0 - private var alpha: Double = 0 - private var eta: Double = 0 + private[clustering] var alpha: Double = 0 + private[clustering] var eta: Double = 0 private var randomGenerator: java.util.Random = null // Online LDA specific parameters @@ -238,12 +238,11 @@ class OnlineLDAOptimizer extends LDAOptimizer { // internal data structure private var docs: RDD[(Long, Vector)] = null - private var lambda: BDM[Double] = null - private var Elogbeta: BDM[Double] = null - private var expElogbeta: BDM[Double] = null + private[clustering] var lambda: BDM[Double] = null // count of invocation to next, which helps deciding the weight for each iteration private var iteration: Int = 0 + private var gammaShape: Double = 100 /** * A (positive) learning parameter that downweights early iterations. Larger values make early @@ -295,7 +294,24 @@ class OnlineLDAOptimizer extends LDAOptimizer { this } - override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + /** + * The function is for test only now. In the future, it can help support training strop/resume + */ + private[clustering] def setLambda(lambda: BDM[Double]): this.type = { + this.lambda = lambda + this + } + + /** + * Used to control the gamma distribution. Larger value produces values closer to 1.0. + */ + private[clustering] def setGammaShape(shape: Double): this.type = { + this.gammaShape = shape + this + } + + override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): + OnlineLDAOptimizer = { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size @@ -307,26 +323,30 @@ class OnlineLDAOptimizer extends LDAOptimizer { // Initialize the variational distribution q(beta|lambda) this.lambda = getGammaMatrix(k, vocabSize) - this.Elogbeta = dirichletExpectation(lambda) - this.expElogbeta = exp(Elogbeta) this.iteration = 0 this } + override private[clustering] def next(): OnlineLDAOptimizer = { + val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) + if (batch.isEmpty()) return this + submitMiniBatch(batch) + } + + /** * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA * model, and it will update the topic distribution adaptively for the terms appearing in the * subset. */ - override private[clustering] def next(): OnlineLDAOptimizer = { + private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = { iteration += 1 - val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) - if (batch.isEmpty()) return this - val k = this.k val vocabSize = this.vocabSize - val expElogbeta = this.expElogbeta + val Elogbeta = dirichletExpectation(lambda) + val expElogbeta = exp(Elogbeta) val alpha = this.alpha + val gammaShape = this.gammaShape val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => val stat = BDM.zeros[Double](k, vocabSize) @@ -340,7 +360,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { } // Initialize the variational distribution q(theta|gamma) for the mini-batch - var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var gammad = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k).t // 1 * K var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K var expElogthetad = exp(Elogthetad) // 1 * K val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids @@ -350,7 +370,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { val ctsVector = new BDV[Double](cts).t // 1 * ids // Iterate between gamma and phi until convergence - while (meanchange > 1e-5) { + while (meanchange > 1e-3) { val lastgamma = gammad // 1*K 1 * ids ids * k gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha @@ -372,7 +392,10 @@ class OnlineLDAOptimizer extends LDAOptimizer { Iterator(stat) } - val batchResult: BDM[Double] = stats.reduce(_ += _) + val statsSum: BDM[Double] = stats.reduce(_ += _) + val batchResult = statsSum :* expElogbeta + + // Note that this is an optimization to avoid batch.count update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt) this } @@ -384,28 +407,23 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = { + private[clustering] def update(stat: BDM[Double], iter: Int, batchSize: Int): Unit = { val tau_0 = this.getTau_0 val kappa = this.getKappa // weight of the mini-batch. val weight = math.pow(tau_0 + iter, -kappa) - // This step finishes computing the sufficient statistics for the M step - val stat = raw :* expElogbeta - // Update lambda based on documents. lambda = lambda * (1 - weight) + (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight - Elogbeta = dirichletExpectation(lambda) - expElogbeta = exp(Elogbeta) } /** * Get a random matrix to initialize lambda */ private def getGammaMatrix(row: Int, col: Int): BDM[Double] = { - val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) + val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape) val temp = gammaRandomGenerator.sample(row * col).toArray new BDM[Double](col, row, temp).t } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 41ec794146c69..6b2293ba49f51 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import breeze.linalg.{DenseMatrix => BDM} + import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors} @@ -54,7 +56,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { } } - test("running and DistributedLDAModel") { + test("running and DistributedLDAModel with default Optimizer (EM)") { val k = 3 val topicSmoothing = 1.2 val termSmoothing = 1.2 @@ -131,6 +133,87 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { assert(lda.getBeta === 3.0) assert(lda.getTopicConcentration === 3.0) } + + test("OnlineLDAOptimizer initialization") { + val lda = new LDA().setK(2) + val corpus = sc.parallelize(tinyCorpus, 2) + val op = new OnlineLDAOptimizer().initialize(corpus, lda) + op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567) + assert(op.alpha == 0.5) // default 1.0 / k + assert(op.eta == 0.5) // default 1.0 / k + assert(op.getKappa == 0.9876) + assert(op.getMiniBatchFraction == 0.123) + assert(op.getTau_0 == 567) + } + + test("OnlineLDAOptimizer one iteration") { + // run OnlineLDAOptimizer for 1 iteration to verify it's consistency with Blei-lab, + // [[https://github.com/Blei-Lab/onlineldavb]] + val k = 2 + val vocabSize = 6 + + def docs: Array[(Long, Vector)] = Array( + Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana + Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog + .zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + val corpus = sc.parallelize(docs, 2) + + // setGammaShape large so to avoid the stochastic impact. + val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40) + .setMiniBatchFraction(1) + val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op) + + val state = op.initialize(corpus, lda) + // override lambda to simulate an intermediate state + // [[ 1.1 1.2 1.3 0.9 0.8 0.7] + // [ 0.9 0.8 0.7 1.1 1.2 1.3]] + op.setLambda(new BDM[Double](k, vocabSize, + Array(1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 0.9, 1.1, 0.8, 1.2, 0.7, 1.3))) + + // run for one iteration + state.submitMiniBatch(corpus) + + // verify the result, Note this generate the identical result as + // [[https://github.com/Blei-Lab/onlineldavb]] + val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") + val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") + assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1) + assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2) + } + + test("OnlineLDAOptimizer with toy data") { + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(0, 1), Array(1, 1)), + Vectors.sparse(6, Array(1, 2), Array(1, 1)), + Vectors.sparse(6, Array(0, 2), Array(1, 1)), + + Vectors.sparse(6, Array(3, 4), Array(1, 1)), + Vectors.sparse(6, Array(3, 5), Array(1, 1)), + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + + val docs = sc.parallelize(toydata) + val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau_0(1024).setKappa(0.51) + .setGammaShape(1e10) + val lda = new LDA().setK(2) + .setDocConcentration(0.01) + .setTopicConcentration(0.01) + .setMaxIterations(100) + .setOptimizer(op) + + val ldaModel = lda.run(docs) + val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) + val topics = topicIndices.map { case (terms, termWeights) => + terms.zip(termWeights) + } + + // check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02) + topics.foreach(topic =>{ + val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2) + assert(smalls.size == 3 && smalls.sum < 0.2) + }) + } + } private[clustering] object LDASuite { From 68c2318f4bba22d86a895f1c9b76e2a431c3241b Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 30 Apr 2015 10:31:39 +0800 Subject: [PATCH 18/21] add a java ut --- .../spark/mllib/clustering/JavaLDASuite.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index fbe171b4b1ab1..2e8a9e99f6d01 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -109,6 +109,37 @@ public void distributedLDAModel() { assert(model.logPrior() < 0.0); } + + @Test + public void OnlineOptimizerCompatibility() { + int k = 3; + double topicSmoothing = 1.2; + double termSmoothing = 1.2; + + // Train a model + OnlineLDAOptimizer op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51) + .setGammaShape(1e40).setMiniBatchFraction(0.5); + LDA lda = new LDA(); + lda.setK(k) + .setDocConcentration(topicSmoothing) + .setTopicConcentration(termSmoothing) + .setMaxIterations(5) + .setSeed(12345) + .setOptimizer(op); + + LDAModel model = lda.run(corpus); + + // Check: basic parameters + assertEquals(model.k(), k); + assertEquals(model.vocabSize(), tinyVocabSize); + + // Check: topic summaries + Tuple2[] roundedTopicSummary = model.describeTopics(); + assertEquals(roundedTopicSummary.length, k); + Tuple2[] roundedLocalTopicSummary = model.describeTopics(); + assertEquals(roundedLocalTopicSummary.length, k); + } + private static int tinyK = LDASuite$.MODULE$.tinyK(); private static int tinyVocabSize = LDASuite$.MODULE$.tinyVocabSize(); private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics(); From 54cf8dabaf5f30fa7ca554c20843f1057f7c83d5 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 1 May 2015 10:02:00 +0800 Subject: [PATCH 19/21] some style change --- .../spark/mllib/clustering/LDAOptimizer.scala | 8 +-- .../spark/mllib/clustering/JavaLDASuite.java | 57 ++++++++++--------- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index d0a299cb894b2..b352847ab6cc2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -295,7 +295,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { } /** - * The function is for test only now. In the future, it can help support training strop/resume + * The function is for test only now. In the future, it can help support training stop/resume */ private[clustering] def setLambda(lambda: BDM[Double]): this.type = { this.lambda = lambda @@ -310,8 +310,9 @@ class OnlineLDAOptimizer extends LDAOptimizer { this } - override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): - OnlineLDAOptimizer = { + override private[clustering] def initialize( + docs: RDD[(Long, Vector)], + lda: LDA): OnlineLDAOptimizer = { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size @@ -333,7 +334,6 @@ class OnlineLDAOptimizer extends LDAOptimizer { submitMiniBatch(batch) } - /** * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA * model, and it will update the topic distribution adaptively for the terms appearing in the diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 2e8a9e99f6d01..dd61f054b18c7 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -109,35 +109,38 @@ public void distributedLDAModel() { assert(model.logPrior() < 0.0); } - @Test public void OnlineOptimizerCompatibility() { - int k = 3; - double topicSmoothing = 1.2; - double termSmoothing = 1.2; - - // Train a model - OnlineLDAOptimizer op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51) - .setGammaShape(1e40).setMiniBatchFraction(0.5); - LDA lda = new LDA(); - lda.setK(k) - .setDocConcentration(topicSmoothing) - .setTopicConcentration(termSmoothing) - .setMaxIterations(5) - .setSeed(12345) - .setOptimizer(op); - - LDAModel model = lda.run(corpus); - - // Check: basic parameters - assertEquals(model.k(), k); - assertEquals(model.vocabSize(), tinyVocabSize); - - // Check: topic summaries - Tuple2[] roundedTopicSummary = model.describeTopics(); - assertEquals(roundedTopicSummary.length, k); - Tuple2[] roundedLocalTopicSummary = model.describeTopics(); - assertEquals(roundedLocalTopicSummary.length, k); + int k = 3; + double topicSmoothing = 1.2; + double termSmoothing = 1.2; + + // Train a model + OnlineLDAOptimizer op = new OnlineLDAOptimizer() + .setTau_0(1024) + .setKappa(0.51) + .setGammaShape(1e40) + .setMiniBatchFraction(0.5); + + LDA lda = new LDA(); + lda.setK(k) + .setDocConcentration(topicSmoothing) + .setTopicConcentration(termSmoothing) + .setMaxIterations(5) + .setSeed(12345) + .setOptimizer(op); + + LDAModel model = lda.run(corpus); + + // Check: basic parameters + assertEquals(model.k(), k); + assertEquals(model.vocabSize(), tinyVocabSize); + + // Check: topic summaries + Tuple2[] roundedTopicSummary = model.describeTopics(); + assertEquals(roundedTopicSummary.length, k); + Tuple2[] roundedLocalTopicSummary = model.describeTopics(); + assertEquals(roundedLocalTopicSummary.length, k); } private static int tinyK = LDASuite$.MODULE$.tinyK(); From 6149ca6438b0e625e364953a5dda103c73d7c8ab Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Sat, 2 May 2015 00:48:14 +0800 Subject: [PATCH 20/21] fix for setOptimizer --- .../main/scala/org/apache/spark/mllib/clustering/LDA.scala | 5 +++-- .../org/apache/spark/mllib/clustering/LDAOptimizer.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 597f17b0972a0..c8daa2388e868 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -210,14 +210,15 @@ class LDA private ( /** * Set the LDAOptimizer used to perform the actual calculation by algorithm name. - * Currently "em" is supported. + * Currently "em", "online" is supported. */ def setOptimizer(optimizerName: String): this.type = { this.ldaOptimizer = optimizerName.toLowerCase match { case "em" => new EMLDAOptimizer + case "online" => new OnlineLDAOptimizer case other => - throw new IllegalArgumentException(s"Only em is supported but got $other.") + throw new IllegalArgumentException(s"Only em, online are supported but got $other.") } this } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index b352847ab6cc2..4353463aca050 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -396,7 +396,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { val batchResult = statsSum :* expElogbeta // Note that this is an optimization to avoid batch.count - update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt) + update(batchResult, iteration, (miniBatchFraction * corpusSize).ceil.toInt) this } From cf376ffbd1eddb3eff9021b9d925afdef873ef22 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Sat, 2 May 2015 16:06:44 -0700 Subject: [PATCH 21/21] =?UTF-8?q?For=20private=20vars=20needed=20for=20tes?= =?UTF-8?q?ting,=20I=20made=20them=20private=20and=20added=20accessors.=20?= =?UTF-8?q?=20Java=20doesn=E2=80=99t=20understand=20package-private=20tags?= =?UTF-8?q?,=20so=20this=20minimizes=20the=20issues=20Java=20users=20might?= =?UTF-8?q?=20encounter.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change miniBatchFraction default to 0.05 to match maxIterations. Added a little doc. Changed end of main online LDA update code to avoid the kron() call. Please confirm if you agree that should be more efficient (not explicitly instantiating a big matrix). Changed Gamma() to use random seed. Scala style updates --- .../spark/mllib/clustering/LDAOptimizer.scala | 56 ++++++++++++++----- .../spark/mllib/clustering/JavaLDASuite.java | 4 +- .../spark/mllib/clustering/LDASuite.scala | 30 +++++----- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 4353463aca050..093aa0f315ab2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -21,7 +21,7 @@ import java.util.Random import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize, kron} import breeze.numerics.{digamma, exp, abs} -import breeze.stats.distributions.Gamma +import breeze.stats.distributions.{Gamma, RandBasis} import org.apache.spark.annotation.Experimental import org.apache.spark.graphx._ @@ -227,20 +227,37 @@ class OnlineLDAOptimizer extends LDAOptimizer { private var k: Int = 0 private var corpusSize: Long = 0 private var vocabSize: Int = 0 - private[clustering] var alpha: Double = 0 - private[clustering] var eta: Double = 0 + + /** alias for docConcentration */ + private var alpha: Double = 0 + + /** (private[clustering] for debugging) Get docConcentration */ + private[clustering] def getAlpha: Double = alpha + + /** alias for topicConcentration */ + private var eta: Double = 0 + + /** (private[clustering] for debugging) Get topicConcentration */ + private[clustering] def getEta: Double = eta + private var randomGenerator: java.util.Random = null // Online LDA specific parameters + // Learning rate is: (tau_0 + t)^{-kappa} private var tau_0: Double = 1024 private var kappa: Double = 0.51 - private var miniBatchFraction: Double = 0.01 + private var miniBatchFraction: Double = 0.05 // internal data structure private var docs: RDD[(Long, Vector)] = null - private[clustering] var lambda: BDM[Double] = null - // count of invocation to next, which helps deciding the weight for each iteration + /** Dirichlet parameter for the posterior over topics */ + private var lambda: BDM[Double] = null + + /** (private[clustering] for debugging) Get parameter for topics */ + private[clustering] def getLambda: BDM[Double] = lambda + + /** Current iteration (count of invocations of [[next()]]) */ private var iteration: Int = 0 private var gammaShape: Double = 100 @@ -285,7 +302,12 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in * each iteration. - * Default: 0.01, i.e., 1% of total documents + * + * Note that this should be adjusted in synch with [[LDA.setMaxIterations()]] + * so the entire corpus is used. Specifically, set both so that + * maxIterations * miniBatchFraction >= 1. + * + * Default: 0.05, i.e., 5% of total documents. */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0, @@ -295,7 +317,9 @@ class OnlineLDAOptimizer extends LDAOptimizer { } /** - * The function is for test only now. In the future, it can help support training stop/resume + * (private[clustering]) + * Set the Dirichlet parameter for the posterior over topics. + * This is only used for testing now. In the future, it can help support training stop/resume. */ private[clustering] def setLambda(lambda: BDM[Double]): this.type = { this.lambda = lambda @@ -303,7 +327,10 @@ class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Used to control the gamma distribution. Larger value produces values closer to 1.0. + * (private[clustering]) + * Used for random initialization of the variational parameters. + * Larger value produces values closer to 1.0. + * This is only used for testing currently. */ private[clustering] def setGammaShape(shape: Double): this.type = { this.gammaShape = shape @@ -380,12 +407,11 @@ class OnlineLDAOptimizer extends LDAOptimizer { meanchange = sum(abs(gammad - lastgamma)) / k } - val m1 = expElogthetad.t.toDenseMatrix.t - val m2 = (ctsVector / phinorm).t.toDenseMatrix - val outerResult = kron(m1, m2) // K * ids + val m1 = expElogthetad.t + val m2 = (ctsVector / phinorm).t.toDenseVector var i = 0 while (i < ids.size) { - stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + stat(::, ids(i)) := stat(::, ids(i)) + m1 * m2(i) i += 1 } } @@ -423,7 +449,9 @@ class OnlineLDAOptimizer extends LDAOptimizer { * Get a random matrix to initialize lambda */ private def getGammaMatrix(row: Int, col: Int): BDM[Double] = { - val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape) + val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister( + randomGenerator.nextLong())) + val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)(randBasis) val temp = gammaRandomGenerator.sample(row * col).toArray new BDM[Double](col, row, temp).t } diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index dd61f054b18c7..f394d903966de 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.ArrayList; -import org.apache.spark.api.java.JavaRDD; import scala.Tuple2; import org.junit.After; @@ -30,6 +29,7 @@ import org.junit.Test; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; @@ -148,6 +148,6 @@ public void OnlineOptimizerCompatibility() { private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics(); private static Tuple2[] tinyTopicDescription = LDASuite$.MODULE$.tinyTopicDescription(); - JavaPairRDD corpus; + private JavaPairRDD corpus; } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 6b2293ba49f51..2dcc881f5abd2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -39,7 +39,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { // Check: describeTopics() with all terms val fullTopicSummary = model.describeTopics() - assert(fullTopicSummary.size === tinyK) + assert(fullTopicSummary.length === tinyK) fullTopicSummary.zip(tinyTopicDescription).foreach { case ((algTerms, algTermWeights), (terms, termWeights)) => assert(algTerms === terms) @@ -101,7 +101,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { // Check: per-doc topic distributions val topicDistributions = model.topicDistributions.collect() // Ensure all documents are covered. - assert(topicDistributions.size === tinyCorpus.size) + assert(topicDistributions.length === tinyCorpus.length) assert(tinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet) // Ensure we have proper distributions topicDistributions.foreach { case (docId, topicDistribution) => @@ -139,8 +139,8 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { val corpus = sc.parallelize(tinyCorpus, 2) val op = new OnlineLDAOptimizer().initialize(corpus, lda) op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567) - assert(op.alpha == 0.5) // default 1.0 / k - assert(op.eta == 0.5) // default 1.0 / k + assert(op.getAlpha == 0.5) // default 1.0 / k + assert(op.getEta == 0.5) // default 1.0 / k assert(op.getKappa == 0.9876) assert(op.getMiniBatchFraction == 0.123) assert(op.getTau_0 == 567) @@ -154,14 +154,14 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { def docs: Array[(Long, Vector)] = Array( Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana - Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog - .zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1)) // tiger, cat, dog + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } val corpus = sc.parallelize(docs, 2) - // setGammaShape large so to avoid the stochastic impact. + // Set GammaShape large to avoid the stochastic impact. val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40) .setMiniBatchFraction(1) - val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op) + val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op).setSeed(12345) val state = op.initialize(corpus, lda) // override lambda to simulate an intermediate state @@ -175,8 +175,8 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { // verify the result, Note this generate the identical result as // [[https://github.com/Blei-Lab/onlineldavb]] - val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") - val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") + val topic1 = op.getLambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") + val topic2 = op.getLambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1) assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2) } @@ -186,7 +186,6 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { Vectors.sparse(6, Array(0, 1), Array(1, 1)), Vectors.sparse(6, Array(1, 2), Array(1, 1)), Vectors.sparse(6, Array(0, 2), Array(1, 1)), - Vectors.sparse(6, Array(3, 4), Array(1, 1)), Vectors.sparse(6, Array(3, 5), Array(1, 1)), Vectors.sparse(6, Array(4, 5), Array(1, 1)) @@ -200,6 +199,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { .setTopicConcentration(0.01) .setMaxIterations(100) .setOptimizer(op) + .setSeed(12345) val ldaModel = lda.run(docs) val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) @@ -208,10 +208,10 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { } // check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02) - topics.foreach(topic =>{ - val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2) - assert(smalls.size == 3 && smalls.sum < 0.2) - }) + topics.foreach { topic => + val smalls = topic.filter(t => t._2 < 0.1).map(_._2) + assert(smalls.length == 3 && smalls.sum < 0.2) + } } }