Skip to content

Commit d8cfd53

Browse files
Feynman Liangjkbradley
authored andcommitted
[SPARK-5567] [MLLIB] Add predict method to LocalLDAModel
jkbradley hhbyyh Adds `topicDistributions` to LocalLDAModel. Please review after apache#7757 is merged. Author: Feynman Liang <[email protected]> Closes apache#7760 from feynmanliang/SPARK-5567-predict-in-LDA and squashes the following commits: 0ad1134 [Feynman Liang] Remove println 27b3877 [Feynman Liang] Code review fixes 6bfb87c [Feynman Liang] Remove extra newline 476f788 [Feynman Liang] Fix checks and doc for variationalInference 061780c [Feynman Liang] Code review cleanup 3be2947 [Feynman Liang] Rename topicDistribution -> topicDistributions 2a821a6 [Feynman Liang] Add predict methods to LocalLDAModel
1 parent a20e743 commit d8cfd53

File tree

3 files changed

+102
-8
lines changed

3 files changed

+102
-8
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ abstract class LDAModel private[clustering] extends Saveable {
186186
* This model stores only the inferred topics.
187187
* It may be used for computing topics for new documents, but it may give less accurate answers
188188
* than the [[DistributedLDAModel]].
189-
*
190189
* @param topics Inferred topics (vocabSize x k matrix).
191190
*/
192191
@Experimental
@@ -221,9 +220,6 @@ class LocalLDAModel private[clustering] (
221220
// TODO
222221
// override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???
223222

224-
// TODO:
225-
// override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???
226-
227223
/**
228224
* Calculate the log variational bound on perplexity. See Equation (16) in original Online
229225
* LDA paper.
@@ -269,15 +265,15 @@ class LocalLDAModel private[clustering] (
269265
// by topic (columns of lambda)
270266
val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t
271267

272-
var score = documents.filter(_._2.numActives > 0).map { case (id: Long, termCounts: Vector) =>
268+
var score = documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) =>
273269
var docScore = 0.0D
274270
val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference(
275271
termCounts, exp(Elogbeta), brzAlpha, gammaShape, k)
276272
val Elogthetad: BDV[Double] = LDAUtils.dirichletExpectation(gammad)
277273

278274
// E[log p(doc | theta, beta)]
279275
termCounts.foreachActive { case (idx, count) =>
280-
docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
276+
docScore += count * LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
281277
}
282278
// E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector
283279
docScore += sum((brzAlpha - gammad) :* Elogthetad)
@@ -297,6 +293,40 @@ class LocalLDAModel private[clustering] (
297293
score
298294
}
299295

296+
/**
297+
* Predicts the topic mixture distribution for each document (often called "theta" in the
298+
* literature). Returns a vector of zeros for an empty document.
299+
*
300+
* This uses a variational approximation following Hoffman et al. (2010), where the approximate
301+
* distribution is called "gamma." Technically, this method returns this approximation "gamma"
302+
* for each document.
303+
* @param documents documents to predict topic mixture distributions for
304+
* @return An RDD of (document ID, topic mixture distribution for document)
305+
*/
306+
// TODO: declare in LDAModel and override once implemented in DistributedLDAModel
307+
def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = {
308+
// Double transpose because dirichletExpectation normalizes by row and we need to normalize
309+
// by topic (columns of lambda)
310+
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t)
311+
val docConcentrationBrz = this.docConcentration.toBreeze
312+
val gammaShape = this.gammaShape
313+
val k = this.k
314+
315+
documents.map { case (id: Long, termCounts: Vector) =>
316+
if (termCounts.numNonzeros == 0) {
317+
(id, Vectors.zeros(k))
318+
} else {
319+
val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
320+
termCounts,
321+
expElogbeta,
322+
docConcentrationBrz,
323+
gammaShape,
324+
k)
325+
(id, Vectors.dense(normalize(gamma, 1.0).toArray))
326+
}
327+
}
328+
}
329+
300330
}
301331

302332

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
394394
val gammaShape = this.gammaShape
395395

396396
val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
397-
val nonEmptyDocs = docs.filter(_._2.numActives > 0)
397+
val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
398398

399399
val stat = BDM.zeros[Double](k, vocabSize)
400400
var gammaPart = List[BDV[Double]]()
@@ -461,7 +461,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
461461
private[clustering] object OnlineLDAOptimizer {
462462
/**
463463
* Uses variational inference to infer the topic distribution `gammad` given the term counts
464-
* for a document. `termCounts` must be non-empty, otherwise Breeze will throw a BLAS error.
464+
* for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will
465+
* throw a BLAS error.
465466
*
466467
* An optimization (Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001)
467468
* avoids explicit computation of variational parameter `phi`.

mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
242242
val alpha = 0.01
243243
val eta = 0.01
244244
val gammaShape = 100
245+
// obtained from LDA model trained in gensim, see below
245246
val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
246247
1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
247248
0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
@@ -281,6 +282,68 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
281282
assert(ldaModel.logPerplexity(docs) ~== -3.690D relTol 1E-3D)
282283
}
283284

285+
test("LocalLDAModel predict") {
286+
val k = 2
287+
val vocabSize = 6
288+
val alpha = 0.01
289+
val eta = 0.01
290+
val gammaShape = 100
291+
// obtained from LDA model trained in gensim, see below
292+
val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
293+
1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
294+
0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
295+
296+
def toydata: Array[(Long, Vector)] = Array(
297+
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
298+
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
299+
Vectors.sparse(6, Array(0, 2), Array(1, 1)),
300+
Vectors.sparse(6, Array(3, 4), Array(1, 1)),
301+
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
302+
Vectors.sparse(6, Array(4, 5), Array(1, 1))
303+
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
304+
val docs = sc.parallelize(toydata)
305+
306+
val ldaModel: LocalLDAModel = new LocalLDAModel(
307+
topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)
308+
309+
/* Verify results using gensim:
310+
import numpy as np
311+
from gensim import models
312+
corpus = [
313+
[(0, 1.0), (1, 1.0)],
314+
[(1, 1.0), (2, 1.0)],
315+
[(0, 1.0), (2, 1.0)],
316+
[(3, 1.0), (4, 1.0)],
317+
[(3, 1.0), (5, 1.0)],
318+
[(4, 1.0), (5, 1.0)]]
319+
np.random.seed(2345)
320+
lda = models.ldamodel.LdaModel(
321+
corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100,
322+
decay=0.51, offset=1024)
323+
print(list(lda.get_document_topics(corpus)))
324+
> [[(0, 0.99504950495049516)], [(0, 0.99504950495049516)],
325+
> [(0, 0.99504950495049516)], [(1, 0.99504950495049516)],
326+
> [(1, 0.99504950495049516)], [(1, 0.99504950495049516)]]
327+
*/
328+
329+
val expectedPredictions = List(
330+
(0, 0.99504), (0, 0.99504),
331+
(0, 0.99504), (1, 0.99504),
332+
(1, 0.99504), (1, 0.99504))
333+
334+
val actualPredictions = ldaModel.topicDistributions(docs).map { case (id, topics) =>
335+
// convert results to expectedPredictions format, which only has highest probability topic
336+
val topicsBz = topics.toBreeze.toDenseVector
337+
(id, (argmax(topicsBz), max(topicsBz)))
338+
}.sortByKey()
339+
.values
340+
.collect()
341+
342+
expectedPredictions.zip(actualPredictions).forall { case (expected, actual) =>
343+
expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D)
344+
}
345+
}
346+
284347
test("OnlineLDAOptimizer with asymmetric prior") {
285348
def toydata: Array[(Long, Vector)] = Array(
286349
Vectors.sparse(6, Array(0, 1), Array(1, 1)),

0 commit comments

Comments
 (0)